diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-25 12:54:00 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-25 12:54:00 +0100 |
commit | b42480bc8d1f4798ea1d9fd1917f2f55adb537ea (patch) | |
tree | 024d69546ce390758dbdfbd7f4c7cf3f0cd45c1f | |
parent | 0c86922cded6d5f1a1f482580157f98e1bec10ea (diff) | |
download | rabbitmq-server-bug23181.tar.gz |
Ensure that non-deletion queue-termination does a call via the msg_store to ensure all messages from the queue have made it to the msg_store's mailbox (and have actually also been acted on)bug23181
-rw-r--r-- | src/rabbit_msg_store.erl | 19 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
3 files changed, 18 insertions, 15 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index ff248c23..a9c7db76 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/1, + sync/3, client_init/2, client_terminate/2, client_delete_and_terminate/3, successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -136,7 +136,7 @@ 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), binary()) -> 'ok'). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). @@ -373,13 +373,13 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState, Server) -> close_all_handles(CState), - ok. + ok = gen_server2:call(Server, client_terminate, infinity). client_delete_and_terminate(CState, Server, Ref) -> - ok = client_terminate(CState), - ok = gen_server2:cast(Server, {delete_client, Ref}). + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). successfully_recovered_state(Server) -> gen_server2:pcall(Server, 7, successfully_recovered_state, infinity). @@ -604,7 +604,10 @@ handle_call({new_client_state, CRef}, _From, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); handle_call(successfully_recovered_state, _From, State) -> - reply(State #msstate.successfully_recovered, State). + reply(State #msstate.successfully_recovered, State); + +handle_call(client_terminate, _From, State) -> + reply(ok, State). handle_cast({write, Guid}, State = #msstate { current_file_handle = CurHdl, @@ -721,7 +724,7 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({delete_client, CRef}, +handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> noreply( State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 082e7877..bcbf6e2e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1514,7 +1514,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1577,7 +1577,7 @@ test_msg_store() -> ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), %% read the second half again, just for fun (aka code coverage) MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), - ok = rabbit_msg_store:client_terminate(MSCState7), + ok = rabbit_msg_store:client_terminate(MSCState7, ?PERSISTENT_MSG_STORE), %% stop and restart, preserving every other msg in 2nd half ok = rabbit_variable_queue:stop_msg_store(), ok = rabbit_variable_queue:start_msg_store( @@ -1602,7 +1602,7 @@ test_msg_store() -> {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(Guids1stHalf, MSCState9)), + msg_store_read(Guids1stHalf, MSCState9), ?PERSISTENT_MSG_STORE), ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), %% restart empty restart_msg_store_empty(), %% now safe to reuse guids diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee8..30d3a8ae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -439,9 +439,10 @@ terminate(State) -> remove_pending_ack(true, tx_commit_index(State)), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_terminate(MSCStateP) + _ -> rabbit_msg_store:client_terminate( + MSCStateP, ?PERSISTENT_MSG_STORE) end, - rabbit_msg_store:client_terminate(MSCStateT), + rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -464,8 +465,7 @@ delete_and_terminate(State) -> case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef), - rabbit_msg_store:client_terminate(MSCStateP) + MSCStateP, ?PERSISTENT_MSG_STORE, PRef) end, rabbit_msg_store:client_delete_and_terminate( MSCStateT, ?TRANSIENT_MSG_STORE, TRef), |