summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-25 12:54:00 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-25 12:54:00 +0100
commitb42480bc8d1f4798ea1d9fd1917f2f55adb537ea (patch)
tree024d69546ce390758dbdfbd7f4c7cf3f0cd45c1f
parent0c86922cded6d5f1a1f482580157f98e1bec10ea (diff)
downloadrabbitmq-server-bug23181.tar.gz
Ensure that non-deletion queue-termination does a call via the msg_store to ensure all messages from the queue have made it to the msg_store's mailbox (and have actually also been acted on)bug23181
-rw-r--r--src/rabbit_msg_store.erl19
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl8
3 files changed, 18 insertions, 15 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index ff248c23..a9c7db76 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/1,
+ sync/3, client_init/2, client_terminate/2,
client_delete_and_terminate/3, successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -136,7 +136,7 @@
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), binary()) -> 'ok').
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
@@ -373,13 +373,13 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState) ->
+client_terminate(CState, Server) ->
close_all_handles(CState),
- ok.
+ ok = gen_server2:call(Server, client_terminate, infinity).
client_delete_and_terminate(CState, Server, Ref) ->
- ok = client_terminate(CState),
- ok = gen_server2:cast(Server, {delete_client, Ref}).
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
successfully_recovered_state(Server) ->
gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
@@ -604,7 +604,10 @@ handle_call({new_client_state, CRef}, _From,
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
handle_call(successfully_recovered_state, _From, State) ->
- reply(State #msstate.successfully_recovered, State).
+ reply(State #msstate.successfully_recovered, State);
+
+handle_call(client_terminate, _From, State) ->
+ reply(ok, State).
handle_cast({write, Guid},
State = #msstate { current_file_handle = CurHdl,
@@ -721,7 +724,7 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({delete_client, CRef},
+handle_cast({client_delete, CRef},
State = #msstate { client_refs = ClientRefs }) ->
noreply(
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 082e7877..bcbf6e2e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1514,7 +1514,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1577,7 +1577,7 @@ test_msg_store() ->
ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
%% read the second half again, just for fun (aka code coverage)
MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
- ok = rabbit_msg_store:client_terminate(MSCState7),
+ ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE),
%% stop and restart, preserving every other msg in 2nd half
ok = rabbit_variable_queue:stop_msg_store(),
ok = rabbit_variable_queue:start_msg_store(
@@ -1602,7 +1602,7 @@ test_msg_store() ->
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(Guids1stHalf, MSCState9)),
+ msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE),
ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
restart_msg_store_empty(), %% now safe to reuse guids
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee8..30d3a8ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -439,9 +439,10 @@ terminate(State) ->
remove_pending_ack(true, tx_commit_index(State)),
case MSCStateP of
undefined -> ok;
- _ -> rabbit_msg_store:client_terminate(MSCStateP)
+ _ -> rabbit_msg_store:client_terminate(
+ MSCStateP, ?PERSISTENT_MSG_STORE)
end,
- rabbit_msg_store:client_terminate(MSCStateT),
+ rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE),
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
@@ -464,8 +465,7 @@ delete_and_terminate(State) ->
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(
- MSCStateP, ?PERSISTENT_MSG_STORE, PRef),
- rabbit_msg_store:client_terminate(MSCStateP)
+ MSCStateP, ?PERSISTENT_MSG_STORE, PRef)
end,
rabbit_msg_store:client_delete_and_terminate(
MSCStateT, ?TRANSIENT_MSG_STORE, TRef),