summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-28 18:05:38 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-28 18:05:38 +0100
commit45adb5ec32c09f6cc8e105d55d86b9d07ddebb39 (patch)
treef0b3271873dbc4a0e0dac7127e9991a123460ef9
parent46ca7002550b19f8bfe32586ca810c2b719f9261 (diff)
parent14d29d6bf7a9868465ef7219bc918ebab29b77e4 (diff)
downloadrabbitmq-server-45adb5ec32c09f6cc8e105d55d86b9d07ddebb39.tar.gz
merge bug24513 into default
-rw-r--r--Makefile1
-rw-r--r--packaging/common/rabbitmq-server.init6
-rwxr-xr-xquickcheck9
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/rabbit_alarm.erl6
-rw-r--r--src/rabbit_backing_queue_qc.erl46
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_memory_monitor.erl20
-rw-r--r--src/rabbit_msg_store.erl263
-rw-r--r--src/rabbit_tests.erl47
-rw-r--r--src/rabbit_variable_queue.erl2
11 files changed, 282 insertions, 125 deletions
diff --git a/Makefile b/Makefile
index 146b6335..2261fcaa 100644
--- a/Makefile
+++ b/Makefile
@@ -368,3 +368,4 @@ ifneq "$(strip $(patsubst clean%,,$(patsubst %clean,,$(TESTABLEGOALS))))" ""
endif
.PHONY: run-qc
+
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index 15fd5d5b..c59af6c1 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -56,8 +56,10 @@ start_rabbitmq () {
RETVAL=0
ensure_pid_dir
set +e
- setsid sh -c "RABBITMQ_PID_FILE=$PID_FILE $DAEMON > \
- ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err" &
+ RABBITMQ_PID_FILE=$PID_FILE setsid $DAEMON \
+ > "${INIT_LOG_DIR}/startup_log" \
+ 2> "${INIT_LOG_DIR}/startup_err" \
+ 0<&- &
$CONTROL wait $PID_FILE >/dev/null 2>&1
RETVAL=$?
set -e
diff --git a/quickcheck b/quickcheck
index a36cf3ed..b5382d75 100755
--- a/quickcheck
+++ b/quickcheck
@@ -6,15 +6,16 @@
%% A helper to test quickcheck properties on a running broker
%% NodeStr is a local broker node name
%% ModStr is the module containing quickcheck properties
-%% The number of trials is optional
-main([NodeStr, ModStr | TrialsStr]) ->
+%% TrialsStr is the number of trials
+main([NodeStr, ModStr, TrialsStr]) ->
{ok, Hostname} = inet:gethostname(),
Node = list_to_atom(NodeStr ++ "@" ++ Hostname),
Mod = list_to_atom(ModStr),
- Trials = lists:map(fun erlang:list_to_integer/1, TrialsStr),
+ Trials = erlang:list_to_integer(TrialsStr),
case rpc:call(Node, code, ensure_loaded, [proper]) of
{module, proper} ->
- case rpc:call(Node, proper, module, [Mod] ++ Trials) of
+ case rpc:call(Node, proper, module,
+ [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of
[] -> ok;
_ -> quit(1)
end;
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 5e782a08..0900f56f 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -202,7 +202,7 @@ with_sups(Fun, Sups) ->
Pids = [begin {ok, Pid} = start_sup(Sup), Pid end || Sup <- Sups],
Fun(Pids),
[kill(Pid) || Pid <- Pids, is_process_alive(Pid)],
- timer:sleep(100),
+ timer:sleep(500),
passed.
start_sup(Spec) ->
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index d38ecb91..fd03ca85 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -45,11 +45,7 @@
start() ->
ok = alarm_handler:add_alarm_handler(?MODULE, []),
{ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
- ok = case MemoryWatermark == 0 of
- true -> ok;
- false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
- [MemoryWatermark])
- end,
+ rabbit_sup:start_restartable_child(vm_memory_monitor, [MemoryWatermark]),
ok.
stop() ->
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 095202dd..c61184a6 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -34,14 +34,15 @@
-export([initial_state/0, command/1, precondition/2, postcondition/3,
next_state/3]).
--export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
+-export([prop_backing_queue_test/0, publish_multiple/1, timeout/2]).
-record(state, {bqstate,
len, %% int
next_seq_id, %% int
messages, %% gb_trees of seqid => {msg_props, basic_msg}
acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}]
- confirms}). %% set of msgid
+ confirms, %% set of msgid
+ publishing}).%% int
%% Initialise model
@@ -51,7 +52,8 @@ initial_state() ->
next_seq_id = 0,
messages = gb_trees:empty(),
acks = [],
- confirms = gb_sets:new()}.
+ confirms = gb_sets:new(),
+ publishing = 0}.
%% Property
@@ -112,10 +114,8 @@ qc_publish(#state{bqstate = BQ}) ->
expiry = oneof([undefined | lists:seq(1, 10)])},
self(), BQ]}.
-qc_publish_multiple(#state{bqstate = BQ}) ->
- {call, ?MODULE, publish_multiple,
- [qc_message(), #message_properties{}, BQ,
- resize(?QUEUE_MAXLEN, pos_integer())]}.
+qc_publish_multiple(#state{}) ->
+ {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
@@ -128,8 +128,7 @@ qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, requeue,
- [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ {call, ?BQMOD, requeue, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
@@ -155,6 +154,10 @@ qc_purge(#state{bqstate = BQ}) ->
%% Preconditions
+%% Create long queues by only allowing publishing
+precondition(#state{publishing = Count}, {call, _Mod, Fun, _Arg})
+ when Count > 0, Fun /= publish ->
+ false;
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
length(Acks) > 0;
@@ -174,6 +177,7 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
+ publishing = PublishCount,
next_seq_id = NextSeq} = S,
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
@@ -183,21 +187,15 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
len = Len + 1,
next_seq_id = NextSeq + 1,
messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages),
+ publishing = {call, erlang, max, [0, {call, erlang, '-',
+ [PublishCount, 1]}]},
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end};
-next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
- #state{len = Len, messages = Messages} = S,
- {S1, Msgs1} = repeat({S, Messages},
- fun ({#state{next_seq_id = NextSeq} = State, Msgs}) ->
- {State #state { next_seq_id = NextSeq + 1},
- gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)}
- end, Count),
- S1#state{bqstate = BQ,
- len = Len + Count,
- messages = Msgs1};
+next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) ->
+ S#state{publishing = PublishCount};
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
@@ -245,7 +243,7 @@ next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
S#state{bqstate = BQ1,
acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
-next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
+next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _V]}) ->
#state{messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
Messages1 = lists:foldl(fun (AckTag, Msgs) ->
@@ -322,12 +320,8 @@ postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
%% Helpers
-repeat(Result, _Fun, 0) -> Result;
-repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1).
-
-publish_multiple(Msg, MsgProps, BQ, Count) ->
- repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end,
- Count).
+publish_multiple(_C) ->
+ ok.
timeout(BQ, 0) ->
BQ;
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 905e4fd0..66a5ab5b 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -325,7 +325,10 @@ action(trace_off, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
- Frac = list_to_float(Arg),
+ Frac = list_to_float(case string:chr(Arg, $.) of
+ 0 -> Arg ++ ".0";
+ _ -> Arg
+ end),
Inform("Setting memory threshhold on ~p to ~p", [Node, Frac]),
rpc_call(Node, vm_memory_monitor, set_vm_memory_high_watermark, [Frac]);
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 3deb9580..02f3158f 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -211,17 +211,19 @@ internal_update(State = #state { queue_durations = Durations,
queue_duration_sum = Sum,
queue_duration_count = Count }) ->
MemoryLimit = ?MEMORY_LIMIT_SCALING * vm_memory_monitor:get_memory_limit(),
- MemoryRatio = erlang:memory(total) / MemoryLimit,
+ MemoryRatio = case MemoryLimit > 0.0 of
+ true -> erlang:memory(total) / MemoryLimit;
+ false -> infinity
+ end,
DesiredDurationAvg1 =
- case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
- true ->
+ if MemoryRatio =:= infinity ->
+ 0.0;
+ MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 ->
infinity;
- false ->
- Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
- true -> Sum + ?SUM_INC_AMOUNT;
- false -> Sum
- end,
- (Sum1 / Count) / MemoryRatio
+ MemoryRatio < ?SUM_INC_THRESHOLD ->
+ ((Sum + ?SUM_INC_AMOUNT) / Count) / MemoryRatio;
+ true ->
+ (Sum / Count) / MemoryRatio
end,
State1 = State #state { desired_duration = DesiredDurationAvg1 },
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d67c30a3..e6a32b90 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -68,6 +68,7 @@
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
cur_file_cache_ets, %% tid of current file cache table
+ flying_ets, %% tid of writes/removes in flight
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
%% to callbacks
@@ -86,7 +87,8 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- cur_file_cache_ets
+ cur_file_cache_ets,
+ flying_ets
}).
-record(file_summary,
@@ -128,12 +130,13 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid()}).
+ cur_file_cache_ets :: ets:tid(),
+ flying_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
-type(maybe_msg_id_fun() ::
- 'undefined' | fun ((gb_set(), 'written' | 'removed') -> any())).
+ 'undefined' | fun ((gb_set(), 'written' | 'ignored') -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
-type(deletion_thunk() :: fun (() -> boolean())).
@@ -375,6 +378,45 @@
%% performance with many healthy clients and few, if any, dying
%% clients, which is the typical case.
%%
+%% When the msg_store has a backlog (i.e. it has unprocessed messages
+%% in its mailbox / gen_server priority queue), a further optimisation
+%% opportunity arises: we can eliminate pairs of 'write' and 'remove'
+%% from the same client for the same message. A typical occurrence of
+%% these is when an empty durable queue delivers persistent messages
+%% to ack'ing consumers. The queue will asynchronously ask the
+%% msg_store to 'write' such messages, and when they are acknowledged
+%% it will issue a 'remove'. That 'remove' may be issued before the
+%% msg_store has processed the 'write'. There is then no point going
+%% ahead with the processing of that 'write'.
+%%
+%% To detect this situation a 'flying_ets' table is shared between the
+%% clients and the server. The table is keyed on the combination of
+%% client (reference) and msg id, and the value represents an
+%% integration of all the writes and removes currently "in flight" for
+%% that message between the client and server - '+1' means all the
+%% writes/removes add up to a single 'write', '-1' to a 'remove', and
+%% '0' to nothing. (NB: the integration can never add up to more than
+%% one 'write' or 'read' since clients must not write/remove a message
+%% more than once without first removing/writing it).
+%%
+%% Maintaining this table poses two challenges: 1) both the clients
+%% and the server access and update the table, which causes
+%% concurrency issues, 2) we must ensure that entries do not stay in
+%% the table forever, since that would constitute a memory leak. We
+%% address the former by carefully modelling all operations as
+%% sequences of atomic actions that produce valid results in all
+%% possible interleavings. We address the latter by deleting table
+%% entries whenever the server finds a 0-valued entry during the
+%% processing of a write/remove. 0 is essentially equivalent to "no
+%% entry". If, OTOH, the value is non-zero we know there is at least
+%% one other 'write' or 'remove' in flight, so we get an opportunity
+%% later to delete the table entry when processing these.
+%%
+%% There are two further complications. We need to ensure that 1)
+%% eliminated writes still get confirmed, and 2) the write-back cache
+%% doesn't grow unbounded. These are quite straightforward to
+%% address. See the comments in the code.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -392,7 +434,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -404,7 +446,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }.
+ cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -420,6 +463,7 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId}).
@@ -440,6 +484,7 @@ read(MsgId,
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
set_maximum_since_use(Server, Age) ->
@@ -566,6 +611,21 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
end
end.
+client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
+ client_ref = CRef }) ->
+ Key = {MsgId, CRef},
+ case ets:insert_new(FlyingEts, {Key, Diff}) of
+ true -> ok;
+ false -> try ets:update_counter(FlyingEts, Key, {2, Diff})
+ catch error:badarg ->
+ %% this is guaranteed to succeed since the
+ %% server only removes and updates flying_ets
+ %% entries; it never inserts them
+ true = ets:insert_new(FlyingEts, {Key, Diff})
+ end,
+ ok
+ end.
+
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
@@ -619,6 +679,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+ FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -645,6 +706,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
dying_clients = sets:new(),
clients = Clients,
successfully_recovered = CleanShutdown,
@@ -700,11 +762,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
- CurFileCacheEts}, State #msstate { clients = Clients1 });
+ CurFileCacheEts, FlyingEts},
+ State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -723,40 +787,54 @@ handle_cast({client_dying, CRef},
noreply(write_message(CRef, <<>>,
State #msstate { dying_clients = DyingClients1 }));
-handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
+handle_cast({client_delete, CRef},
+ State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ case update_flying(-1, MsgId, CRef, State) of
+ process ->
+ [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
+ noreply(write_message(MsgId, Msg, CRef, State));
+ ignore ->
+ %% A 'remove' has already been issued and eliminated the
+ %% 'write'.
+ State1 = blind_confirm(CRef, gb_sets:singleton(MsgId),
+ ignored, State),
+ %% If all writes get eliminated, cur_file_cache_ets could
+ %% grow unbounded. To prevent that we delete the cache
+ %% entry here, but only if the message isn't in the
+ %% current file. That way reads of the message can
+ %% continue to be done client side, from either the cache
+ %% or the non-current files. If the message *is* in the
+ %% current file then the cache entry will be removed by
+ %% the normal logic for that in write_message/4 and
+ %% maybe_roll_to_new_file/2.
+ case index_lookup(MsgId, State1) of
+ [#msg_location { file = File }]
+ when File == State1 #msstate.current_file ->
+ ok;
+ _ ->
+ true = ets:match_delete(CurFileCacheEts, {MsgId, '_', 0})
+ end,
+ noreply(State1)
+ end;
handle_cast({remove, CRef, MsgIds}, State) ->
- State1 = lists:foldl(
- fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
- State, MsgIds),
- noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
- removed, State1)));
+ {RemovedMsgIds, State1} =
+ lists:foldl(
+ fun (MsgId, {Removed, State2}) ->
+ case update_flying(+1, MsgId, CRef, State2) of
+ process -> {[MsgId | Removed],
+ remove_message(MsgId, CRef, State2)};
+ ignore -> {Removed, State2}
+ end
+ end, {[], State}, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(RemovedMsgIds),
+ ignored, State1)));
handle_cast({combine_files, Source, Destination, Reclaimed},
State = #msstate { sum_file_size = SumFileSize,
@@ -797,6 +875,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_ets = FlyingEts,
clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
@@ -810,8 +889,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
- [true = ets:delete(T) ||
- T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
+ [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
+ CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -874,6 +953,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
client_confirm(CRef, MsgIds, written, StateN)
end, State1, CGs).
+update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
+ Key = {MsgId, CRef},
+ NDiff = -Diff,
+ case ets:lookup(FlyingEts, Key) of
+ [] -> ignore;
+ [{_, Diff}] -> ignore;
+ [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
+ true = ets:delete_object(FlyingEts, {Key, 0}),
+ process;
+ [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
+ ignore
+ end.
+
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
write_action({true, #msg_location { file = File }}, _MsgId, State) ->
@@ -905,8 +997,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, MsgId, Msg, State) ->
- write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
+write_message(MsgId, Msg, CRef,
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(MsgId, Msg,
+ record_pending_confirm(CRef, MsgId, State1));
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end.
+
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end.
write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -1004,43 +1153,6 @@ contains_message(MsgId, From,
end
end.
-remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg
- %% whilst it's being GC'd. ASSERTION:
- %% [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }} when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () ->
- index_update_ref_count(MsgId, RefCount - 1, State)
- end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here
- %% because there may be further writes in the mailbox
- %% for the same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(File, -TotalSize,
- State))
- end;
- _ -> ok = Dec(),
- State
- end
- end.
-
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =
@@ -1120,6 +1232,11 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
end
end, CRef, State).
+blind_confirm(CRef, MsgIds, ActionTaken, State) ->
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) -> MsgOnDiskFun(MsgIds, ActionTaken), CTM end,
+ CRef, State).
+
%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index fcfd5557..00d46f5a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1778,7 +1778,7 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
- {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
+ {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
Ref = rabbit_guid:guid(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
@@ -1789,6 +1789,8 @@ test_msg_store() ->
false = msg_store_contains(false, MsgIds, MSCState),
%% test confirm logic
passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState),
+ %% check we don't contain any of the msgs we're about to publish
+ false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
@@ -1896,12 +1898,12 @@ test_msg_store() ->
false = msg_store_contains(false, MsgIdsBig, MSCStateM),
MSCStateM
end),
+ %%
+ passed = test_msg_store_client_delete_and_terminate(),
%% restart empty
restart_msg_store_empty(),
passed.
-%% We want to test that writes that get eliminated due to removes still
-%% get confirmed. Removes themselves do not.
test_msg_store_confirms(MsgIds, Cap, MSCState) ->
%% write -> confirmed
ok = msg_store_write(MsgIds, MSCState),
@@ -1927,6 +1929,45 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
ok = msg_store_write(MsgIds, MSCState),
ok = msg_store_remove(MsgIds, MSCState),
ok = on_disk_await(Cap, MsgIds),
+ %% confirmation on timer-based sync
+ passed = test_msg_store_confirm_timer(),
+ passed.
+
+test_msg_store_confirm_timer() ->
+ Ref = rabbit_guid:guid(),
+ MsgId = msg_id_bin(1),
+ Self = self(),
+ MSCState = rabbit_msg_store:client_init(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (MsgIds, _ActionTaken) ->
+ case gb_sets:is_member(MsgId, MsgIds) of
+ true -> Self ! on_disk;
+ false -> ok
+ end
+ end, undefined),
+ ok = msg_store_write([MsgId], MSCState),
+ ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState),
+ ok = msg_store_remove([MsgId], MSCState),
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
+ passed.
+
+msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
+ receive
+ on_disk -> ok
+ after 0 ->
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ msg_store_keep_busy_until_confirm(MsgIds, MSCState)
+ end.
+
+test_msg_store_client_delete_and_terminate() ->
+ restart_msg_store_empty(),
+ MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
+ Ref = rabbit_guid:guid(),
+ MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
+ ok = msg_store_write(MsgIds, MSCState),
+ %% test the 'dying client' fast path for writes
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
passed.
queue_name(Name) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 45e852da..63a0927f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1290,7 +1290,7 @@ blind_confirm(Callback, MsgIdSet) ->
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-msgs_written_to_disk(Callback, MsgIdSet, removed) ->
+msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
blind_confirm(Callback, MsgIdSet);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
Callback(?MODULE,