diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-22 17:16:03 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-22 17:16:03 +0100 |
commit | 4283426a36a31919995c76906a2b7c624a4b4935 (patch) | |
tree | 6f5d39b49e9b8c5480f8e25ab8f1d1d0f343fa67 /src | |
parent | 3a27406558f9a21f85fc3b20e9bbb0c9a97b3e39 (diff) | |
download | rabbitmq-server-4283426a36a31919995c76906a2b7c624a4b4935.tar.gz |
record for pending items
Diffstat (limited to 'src')
-rw-r--r-- | src/file_handle_cache.erl | 46 |
1 files changed, 29 insertions, 17 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 75832309..51b96a68 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -196,6 +196,13 @@ pending_closes }). +-record(pending, + { kind, + pid, + requested, + from + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -795,7 +802,10 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, clients = Clients }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), - Item = {open, Pid, Requested, From}, + Item = #pending { kind = open, + pid = Pid, + requested = Requested, + from = From }, ok = track_client(Pid, Clients), State1 = State #fhc_state { elders = Elders1 }, case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of @@ -822,12 +832,12 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, when Limit =/= infinity andalso Count >= Limit -> ok = track_client(Pid, Clients), true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - Item = {obtain, Pid, 1, From}, + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> - Item = {obtain, Pid, 1, From}, + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, ok = track_client(Pid, Clients), case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of true -> @@ -889,7 +899,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, [#cstate { opened = Opened, obtained = Obtained }] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), - FilterFun = fun ({_Kind, Pid1, _Requested, _From}) -> Pid1 =/= Pid end, + FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, {noreply, process_pending( State #fhc_state { open_count = OpenCount - Opened, @@ -928,14 +938,15 @@ filter_pending(Fun, {Count, Queue}) -> pending_new() -> {0, queue:new()}. -pending_in(Item, {Count, Queue}) -> - {Count + requested(Item), queue:in(Item, Queue)}. +pending_in(Item = #pending { requested = Requested }, {Count, Queue}) -> + {Count + Requested, queue:in(Item, Queue)}. pending_out({0, _Queue} = Pending) -> {empty, Pending}; pending_out({N, Queue}) -> - {{value, Item} = Result, Queue1} = queue:out(Queue), - {Result, {N - requested(Item), Queue1}}. + {{value, #pending { requested = Requested }} = Result, Queue1} = + queue:out(Queue), + {Result, {N - Requested, Queue1}}. pending_count({Count, _Queue}) -> Count. @@ -987,17 +998,18 @@ process_pending(Pending, Quota, State) -> case pending_out(Pending) of {empty, _Pending} -> {Pending, State}; - {{value, Item}, Pending1} -> - case requested(Item) of - Requested when Requested > Quota -> - {Pending, State}; - Requested -> - process_pending(Pending1, Quota - Requested, - run_pending_item(Item, State)) - end + {{value, #pending { requested = Requested }}, _Pending1} + when Requested > Quota -> + {Pending, State}; + {{value, #pending { requested = Requested } = Item}, Pending1} -> + process_pending(Pending1, Quota - Requested, + run_pending_item(Item, State)) end. -run_pending_item({Kind, Pid, Requested, From}, +run_pending_item(#pending { kind = Kind, + pid = Pid, + requested = Requested, + from = From }, State = #fhc_state { clients = Clients }) -> gen_server:reply(From, ok), true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), |