summaryrefslogtreecommitdiff
path: root/src/couch/src/couch_work_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/src/couch_work_queue.erl')
-rw-r--r--src/couch/src/couch_work_queue.erl106
1 files changed, 46 insertions, 60 deletions
diff --git a/src/couch/src/couch_work_queue.erl b/src/couch/src/couch_work_queue.erl
index 5d747de82..d767a33be 100644
--- a/src/couch/src/couch_work_queue.erl
+++ b/src/couch/src/couch_work_queue.erl
@@ -35,21 +35,17 @@
multi_workers = false
}).
-
new(Options) ->
gen_server:start_link(couch_work_queue, Options, []).
-
queue(Wq, Item) when is_binary(Item) ->
gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity);
queue(Wq, Item) ->
gen_server:call(Wq, {queue, Item, ?term_size(Item)}, infinity).
-
dequeue(Wq) ->
dequeue(Wq, all).
-
dequeue(Wq, MaxItems) ->
try
gen_server:call(Wq, {dequeue, MaxItems}, infinity)
@@ -57,7 +53,6 @@ dequeue(Wq, MaxItems) ->
_:_ -> closed
end.
-
item_count(Wq) ->
try
gen_server:call(Wq, item_count, infinity)
@@ -65,7 +60,6 @@ item_count(Wq) ->
_:_ -> closed
end.
-
size(Wq) ->
try
gen_server:call(Wq, size, infinity)
@@ -73,10 +67,8 @@ size(Wq) ->
_:_ -> closed
end.
-
close(Wq) ->
gen_server:cast(Wq, close).
-
init(Options) ->
Q = #q{
@@ -86,50 +78,47 @@ init(Options) ->
},
{ok, Q, hibernate}.
-
-terminate(_Reason, #q{work_waiters=Workers}) ->
+terminate(_Reason, #q{work_waiters = Workers}) ->
lists:foreach(fun({W, _}) -> gen_server:reply(W, closed) end, Workers).
-
handle_call({queue, Item, Size}, From, #q{work_waiters = []} = Q0) ->
- Q = Q0#q{size = Q0#q.size + Size,
- items = Q0#q.items + 1,
- queue = queue:in({Item, Size}, Q0#q.queue)},
- case (Q#q.size >= Q#q.max_size) orelse
- (Q#q.items >= Q#q.max_items) of
- true ->
- {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate};
- false ->
- {reply, ok, Q, hibernate}
+ Q = Q0#q{
+ size = Q0#q.size + Size,
+ items = Q0#q.items + 1,
+ queue = queue:in({Item, Size}, Q0#q.queue)
+ },
+ case
+ (Q#q.size >= Q#q.max_size) orelse
+ (Q#q.items >= Q#q.max_items)
+ of
+ true ->
+ {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate};
+ false ->
+ {reply, ok, Q, hibernate}
end;
-
handle_call({queue, Item, _}, _From, #q{work_waiters = [{W, _Max} | Rest]} = Q) ->
gen_server:reply(W, {ok, [Item]}),
{reply, ok, Q#q{work_waiters = Rest}, hibernate};
-
handle_call({dequeue, Max}, From, Q) ->
#q{work_waiters = Workers, multi_workers = Multi, items = Count} = Q,
case {Workers, Multi} of
- {[_ | _], false} ->
- exit("Only one caller allowed to wait for this work at a time");
- {[_ | _], true} ->
- {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}};
- _ ->
- case Count of
- 0 ->
- {noreply, Q#q{work_waiters=Workers ++ [{From, Max}]}};
- C when C > 0 ->
- deliver_queue_items(Max, Q)
- end
+ {[_ | _], false} ->
+ exit("Only one caller allowed to wait for this work at a time");
+ {[_ | _], true} ->
+ {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}};
+ _ ->
+ case Count of
+ 0 ->
+ {noreply, Q#q{work_waiters = Workers ++ [{From, Max}]}};
+ C when C > 0 ->
+ deliver_queue_items(Max, Q)
+ end
end;
-
handle_call(item_count, _From, Q) ->
{reply, Q#q.items, Q};
-
handle_call(size, _From, Q) ->
{reply, Q#q.size, Q}.
-
deliver_queue_items(Max, Q) ->
#q{
queue = Queue,
@@ -139,48 +128,45 @@ deliver_queue_items(Max, Q) ->
blocked = Blocked
} = Q,
case (Max =:= all) orelse (Max >= Count) of
- false ->
- {Items, Size2, Queue2, Blocked2} = dequeue_items(
- Max, Size, Queue, Blocked, []),
- Q2 = Q#q{
- items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
- },
- {reply, {ok, Items}, Q2};
- true ->
- lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
- Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
- Items = [Item || {Item, _} <- queue:to_list(Queue)],
- case Close of
false ->
+ {Items, Size2, Queue2, Blocked2} = dequeue_items(
+ Max, Size, Queue, Blocked, []
+ ),
+ Q2 = Q#q{
+ items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
+ },
{reply, {ok, Items}, Q2};
true ->
- {stop, normal, {ok, Items}, Q2}
- end
+ lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
+ Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
+ Items = [Item || {Item, _} <- queue:to_list(Queue)],
+ case Close of
+ false ->
+ {reply, {ok, Items}, Q2};
+ true ->
+ {stop, normal, {ok, Items}, Q2}
+ end
end.
-
dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) ->
{lists:reverse(DequeuedAcc), Size, Queue, Blocked};
-
dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) ->
{{value, {Item, ItemSize}}, Queue2} = queue:out(Queue),
case Blocked of
- [] ->
- Blocked2 = Blocked;
- [From | Blocked2] ->
- gen_server:reply(From, ok)
+ [] ->
+ Blocked2 = Blocked;
+ [From | Blocked2] ->
+ gen_server:reply(From, ok)
end,
dequeue_items(
- NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]).
-
+ NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]
+ ).
handle_cast(close, #q{items = 0} = Q) ->
{stop, normal, Q};
-
handle_cast(close, Q) ->
{noreply, Q#q{close_on_dequeue = true}}.
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.