summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-12 16:34:55 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-12 16:34:55 +0100
commita014b2e567576557475eab2b3916a7b1d65ef59f (patch)
tree39fc885d34ca175ccf7228a73adec8ed9d163f7b
parentf5ce49652d4e5c143e356969e9b7bc9e665e617d (diff)
downloadrabbitmq-server-a014b2e567576557475eab2b3916a7b1d65ef59f.tar.gz
If the client is dying, turn the sync into a noop
-rw-r--r--src/rabbit_msg_store.erl24
1 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e90e1281..21a499c5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -441,7 +441,8 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
server_cast(CState, {remove, CRef, MsgIds}).
-sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
+sync(MsgIds, K, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {sync, CRef, MsgIds, K}).
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
@@ -760,19 +761,26 @@ handle_cast({remove, CRef, MsgIds}, State) ->
noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
removed, State1)));
-handle_cast({sync, MsgIds, K},
+handle_cast({sync, CRef, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
- on_sync = Syncs }) ->
- {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(fun (MsgId) ->
+ on_sync = Syncs,
+ dying_clients = DyingClients }) ->
+ case sets:is_element(CRef, DyingClients) of
+ true ->
+ noreply(State);
+ false ->
+ {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
+ case lists:any(
+ fun (MsgId) ->
#msg_location { file = File, offset = Offset } =
index_lookup(MsgId, State),
File =:= CurFile andalso Offset >= SyncOffset
end, MsgIds) of
- false -> K(),
- noreply(State);
- true -> noreply(State #msstate { on_sync = [K | Syncs] })
+ false -> K(),
+ noreply(State);
+ true -> noreply(State #msstate { on_sync = [K | Syncs] })
+ end
end;
handle_cast({combine_files, Source, Destination, Reclaimed},