diff options
Diffstat (limited to 'src/couch/src/couch_work_queue.erl')
-rw-r--r-- | src/couch/src/couch_work_queue.erl | 106 |
1 files changed, 46 insertions, 60 deletions
diff --git a/src/couch/src/couch_work_queue.erl b/src/couch/src/couch_work_queue.erl index 5d747de82..d767a33be 100644 --- a/src/couch/src/couch_work_queue.erl +++ b/src/couch/src/couch_work_queue.erl @@ -35,21 +35,17 @@ multi_workers = false }). - new(Options) -> gen_server:start_link(couch_work_queue, Options, []). - queue(Wq, Item) when is_binary(Item) -> gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity); queue(Wq, Item) -> gen_server:call(Wq, {queue, Item, ?term_size(Item)}, infinity). - dequeue(Wq) -> dequeue(Wq, all). - dequeue(Wq, MaxItems) -> try gen_server:call(Wq, {dequeue, MaxItems}, infinity) @@ -57,7 +53,6 @@ dequeue(Wq, MaxItems) -> _:_ -> closed end. - item_count(Wq) -> try gen_server:call(Wq, item_count, infinity) @@ -65,7 +60,6 @@ item_count(Wq) -> _:_ -> closed end. - size(Wq) -> try gen_server:call(Wq, size, infinity) @@ -73,10 +67,8 @@ size(Wq) -> _:_ -> closed end. - close(Wq) -> gen_server:cast(Wq, close). - init(Options) -> Q = #q{ @@ -86,50 +78,47 @@ init(Options) -> }, {ok, Q, hibernate}. - -terminate(_Reason, #q{work_waiters=Workers}) -> +terminate(_Reason, #q{work_waiters = Workers}) -> lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers). - handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) -> - Q = Q0#q{size = Q0#q.size + Size, - items = Q0#q.items + 1, - queue = queue:in({Item, Size}, Q0#q.queue)}, - case (Q#q.size >= Q#q.max_size) orelse - (Q#q.items >= Q#q.max_items) of - true -> - {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate}; - false -> - {reply, ok, Q, hibernate} + Q = Q0#q{ + size = Q0#q.size + Size, + items = Q0#q.items + 1, + queue = queue:in({Item, Size}, Q0#q.queue) + }, + case + (Q#q.size >= Q#q.max_size) orelse + (Q#q.items >= Q#q.max_items) + of + true -> + {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate}; + false -> + {reply, ok, Q, hibernate} end; - handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) -> gen_server:reply(W, {ok, [Item]}), {reply, ok, Q#q{work_waiters = Rest}, hibernate}; - handle_call({dequeue, Max}, From, Q) -> #q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q, case {Workers, Multi} of - {[_ | _], false} -> - exit("Only one caller allowed to wait for this work at a time"); - {[_ | _], true} -> - {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; - _ -> - case Count of - 0 -> - {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}}; - C when C > 0 -> - deliver_queue_items(Max, Q) - end + {[_ | _], false} -> + exit("Only one caller allowed to wait for this work at a time"); + {[_ | _], true} -> + {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; + _ -> + case Count of + 0 -> + {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}}; + C when C > 0 -> + deliver_queue_items(Max, Q) + end end; - handle_call(item_count, _From, Q) -> {reply, Q#q.items, Q}; - handle_call(size, _From, Q) -> {reply, Q#q.size, Q}. - deliver_queue_items(Max, Q) -> #q{ queue = Queue, @@ -139,48 +128,45 @@ deliver_queue_items(Max, Q) -> blocked = Blocked } = Q, case (Max =:= all) orelse (Max >= Count) of - false -> - {Items, Size2, Queue2, Blocked2} = dequeue_items( - Max, Size, Queue, Blocked, []), - Q2 = Q#q{ - items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2 - }, - {reply, {ok, Items}, Q2}; - true -> - lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), - Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, - Items = [Item || {Item, _} <- queue:to_list(Queue)], - case Close of false -> + {Items, Size2, Queue2, Blocked2} = dequeue_items( + Max, Size, Queue, Blocked, [] + ), + Q2 = Q#q{ + items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2 + }, {reply, {ok, Items}, Q2}; true -> - {stop, normal, {ok, Items}, Q2} - end + lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), + Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, + Items = [Item || {Item, _} <- queue:to_list(Queue)], + case Close of + false -> + {reply, {ok, Items}, Q2}; + true -> + {stop, normal, {ok, Items}, Q2} + end end. - dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) -> {lists:reverse(DequeuedAcc), Size, Queue, Blocked}; - dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) -> {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue), case Blocked of - [] -> - Blocked2 = Blocked; - [From | Blocked2] -> - gen_server:reply(From, ok) + [] -> + Blocked2 = Blocked; + [From | Blocked2] -> + gen_server:reply(From, ok) end, dequeue_items( - NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]). - + NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc] + ). handle_cast(close, #q{items = 0} = Q) -> {stop, normal, Q}; - handle_cast(close, Q) -> {noreply, Q#q{close_on_dequeue = true}}. - code_change(_OldVsn, State, _Extra) -> {ok, State}. |