summaryrefslogtreecommitdiff
path: root/src/couch/test/eunit/couch_work_queue_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/couch/test/eunit/couch_work_queue_tests.erl')
-rw-r--r--src/couch/test/eunit/couch_work_queue_tests.erl402
1 files changed, 402 insertions, 0 deletions
diff --git a/src/couch/test/eunit/couch_work_queue_tests.erl b/src/couch/test/eunit/couch_work_queue_tests.erl
new file mode 100644
index 000000000..a192230ef
--- /dev/null
+++ b/src/couch/test/eunit/couch_work_queue_tests.erl
@@ -0,0 +1,402 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_work_queue_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+-define(TIMEOUT, 100).
+
+
+setup(Opts) ->
+ {ok, Q} = couch_work_queue:new(Opts),
+ Producer = spawn_producer(Q),
+ Consumer = spawn_consumer(Q),
+ {Q, Producer, Consumer}.
+
+setup_max_items() ->
+ setup([{max_items, 3}]).
+
+setup_max_size() ->
+ setup([{max_size, 160}]).
+
+setup_max_items_and_size() ->
+ setup([{max_size, 160}, {max_items, 3}]).
+
+setup_multi_workers() ->
+ {Q, Producer, Consumer1} = setup([{max_size, 160},
+ {max_items, 3},
+ {multi_workers, true}]),
+ Consumer2 = spawn_consumer(Q),
+ Consumer3 = spawn_consumer(Q),
+ {Q, Producer, [Consumer1, Consumer2, Consumer3]}.
+
+teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
+ % consume all to unblock and let producer/consumer stop without timeout
+ [consume(Consumer, all) || Consumer <- Consumers],
+
+ ok = close_queue(Q),
+ ok = stop(Producer, "producer"),
+ R = [stop(Consumer, "consumer") || Consumer <- Consumers],
+ R = [ok || _ <- Consumers],
+ ok;
+teardown({Q, Producer, Consumer}) ->
+ teardown({Q, Producer, [Consumer]}).
+
+
+single_consumer_test_() ->
+ {
+ "Single producer and consumer",
+ [
+ {
+ "Queue with 3 max items",
+ {
+ foreach,
+ fun setup_max_items/0, fun teardown/1,
+ single_consumer_max_item_count() ++ common_cases()
+ }
+ },
+ {
+ "Queue with max size of 160 bytes",
+ {
+ foreach,
+ fun setup_max_size/0, fun teardown/1,
+ single_consumer_max_size() ++ common_cases()
+ }
+ },
+ {
+ "Queue with max size of 160 bytes and 3 max items",
+ {
+ foreach,
+ fun setup_max_items_and_size/0, fun teardown/1,
+ single_consumer_max_items_and_size() ++ common_cases()
+ }
+ }
+ ]
+ }.
+
+multiple_consumers_test_() ->
+ {
+ "Single producer and multiple consumers",
+ [
+ {
+ "Queue with max size of 160 bytes and 3 max items",
+ {
+ foreach,
+ fun setup_multi_workers/0, fun teardown/1,
+ common_cases() ++ multiple_consumers()
+ }
+
+ }
+ ]
+ }.
+
+common_cases()->
+ [
+ fun should_block_consumer_on_dequeue_from_empty_queue/1,
+ fun should_consume_right_item/1,
+ fun should_timeout_on_close_non_empty_queue/1,
+ fun should_not_block_producer_for_non_empty_queue_after_close/1,
+ fun should_be_closed/1
+ ].
+
+single_consumer_max_item_count()->
+ [
+ fun should_have_no_items_for_new_queue/1,
+ fun should_block_producer_on_full_queue_count/1,
+ fun should_receive_first_queued_item/1,
+ fun should_consume_multiple_items/1,
+ fun should_consume_all/1
+ ].
+
+single_consumer_max_size()->
+ [
+ fun should_have_zero_size_for_new_queue/1,
+ fun should_block_producer_on_full_queue_size/1,
+ fun should_increase_queue_size_on_produce/1,
+ fun should_receive_first_queued_item/1,
+ fun should_consume_multiple_items/1,
+ fun should_consume_all/1
+ ].
+
+single_consumer_max_items_and_size() ->
+ single_consumer_max_item_count() ++ single_consumer_max_size().
+
+multiple_consumers() ->
+ [
+ fun should_have_zero_size_for_new_queue/1,
+ fun should_have_no_items_for_new_queue/1,
+ fun should_increase_queue_size_on_produce/1
+ ].
+
+
+should_have_no_items_for_new_queue({Q, _, _}) ->
+ ?_assertEqual(0, couch_work_queue:item_count(Q)).
+
+should_have_zero_size_for_new_queue({Q, _, _}) ->
+ ?_assertEqual(0, couch_work_queue:size(Q)).
+
+should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) ->
+ [consume(C, 2) || C <- Consumers],
+ Pongs = [ping(C) || C <- Consumers],
+ ?_assertEqual([timeout, timeout, timeout], Pongs);
+should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) ->
+ consume(Consumer, 1),
+ Pong = ping(Consumer),
+ ?_assertEqual(timeout, Pong).
+
+should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) ->
+ [consume(C, 3) || C <- Consumers],
+
+ Item1 = produce(Q, Producer, 10, false),
+ ok = ping(Producer),
+ ?assertEqual(0, couch_work_queue:item_count(Q)),
+ ?assertEqual(0, couch_work_queue:size(Q)),
+
+ Item2 = produce(Q, Producer, 10, false),
+ ok = ping(Producer),
+ ?assertEqual(0, couch_work_queue:item_count(Q)),
+ ?assertEqual(0, couch_work_queue:size(Q)),
+
+ Item3 = produce(Q, Producer, 10, false),
+ ok = ping(Producer),
+ ?assertEqual(0, couch_work_queue:item_count(Q)),
+ ?assertEqual(0, couch_work_queue:size(Q)),
+
+ R = [{ping(C), Item}
+ || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])],
+
+ ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R);
+should_consume_right_item({Q, Producer, Consumer}) ->
+ consume(Consumer, 1),
+ Item = produce(Q, Producer, 10, false),
+ produce(Q, Producer, 20, true),
+ ok = ping(Producer),
+ ok = ping(Consumer),
+ {ok, Items} = last_consumer_items(Consumer),
+ ?_assertEqual([Item], Items).
+
+should_increase_queue_size_on_produce({Q, Producer, _}) ->
+ produce(Q, Producer, 50, true),
+ ok = ping(Producer),
+ Count1 = couch_work_queue:item_count(Q),
+ Size1 = couch_work_queue:size(Q),
+
+ produce(Q, Producer, 10, true),
+ Count2 = couch_work_queue:item_count(Q),
+ Size2 = couch_work_queue:size(Q),
+
+ ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]).
+
+should_block_producer_on_full_queue_count({Q, Producer, _}) ->
+ produce(Q, Producer, 10, true),
+ ?assertEqual(1, couch_work_queue:item_count(Q)),
+ ok = ping(Producer),
+
+ produce(Q, Producer, 15, true),
+ ?assertEqual(2, couch_work_queue:item_count(Q)),
+ ok = ping(Producer),
+
+ produce(Q, Producer, 20, true),
+ ?assertEqual(3, couch_work_queue:item_count(Q)),
+ Pong = ping(Producer),
+
+ ?_assertEqual(timeout, Pong).
+
+should_block_producer_on_full_queue_size({Q, Producer, _}) ->
+ produce(Q, Producer, 100, true),
+ ok = ping(Producer),
+ ?assertEqual(1, couch_work_queue:item_count(Q)),
+ ?assertEqual(100, couch_work_queue:size(Q)),
+
+ produce(Q, Producer, 110, false),
+ Pong = ping(Producer),
+ ?assertEqual(2, couch_work_queue:item_count(Q)),
+ ?assertEqual(210, couch_work_queue:size(Q)),
+
+ ?_assertEqual(timeout, Pong).
+
+should_consume_multiple_items({Q, Producer, Consumer}) ->
+ Item1 = produce(Q, Producer, 10, true),
+ ok = ping(Producer),
+
+ Item2 = produce(Q, Producer, 15, true),
+ ok = ping(Producer),
+
+ consume(Consumer, 2),
+
+ {ok, Items} = last_consumer_items(Consumer),
+ ?_assertEqual([Item1, Item2], Items).
+
+should_receive_first_queued_item({Q, Producer, Consumer}) ->
+ consume(Consumer, 100),
+ timeout = ping(Consumer),
+
+ Item = produce(Q, Producer, 11, false),
+ ok = ping(Producer),
+
+ ok = ping(Consumer),
+ ?assertEqual(0, couch_work_queue:item_count(Q)),
+
+ {ok, Items} = last_consumer_items(Consumer),
+ ?_assertEqual([Item], Items).
+
+should_consume_all({Q, Producer, Consumer}) ->
+ Item1 = produce(Q, Producer, 10, true),
+ Item2 = produce(Q, Producer, 15, true),
+ Item3 = produce(Q, Producer, 20, true),
+
+ consume(Consumer, all),
+
+ {ok, Items} = last_consumer_items(Consumer),
+ ?_assertEqual([Item1, Item2, Item3], Items).
+
+should_timeout_on_close_non_empty_queue({Q, Producer, _}) ->
+ produce(Q, Producer, 1, true),
+ Status = close_queue(Q),
+
+ ?_assertEqual(timeout, Status).
+
+should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
+ produce(Q, Producer, 1, true),
+ close_queue(Q),
+ Pong = ping(Producer),
+ Size = couch_work_queue:size(Q),
+ Count = couch_work_queue:item_count(Q),
+
+ ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).
+
+should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
+ ok = close_queue(Q),
+
+ [consume(C, 1) || C <- Consumers],
+
+ LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
+ ItemsCount = couch_work_queue:item_count(Q),
+ Size = couch_work_queue:size(Q),
+
+ ?_assertEqual({[closed, closed, closed], closed, closed},
+ {LastConsumerItems, ItemsCount, Size});
+should_be_closed({Q, _, Consumer}) ->
+ ok = close_queue(Q),
+
+ consume(Consumer, 1),
+
+ LastConsumerItems = last_consumer_items(Consumer),
+ ItemsCount = couch_work_queue:item_count(Q),
+ Size = couch_work_queue:size(Q),
+
+ ?_assertEqual({closed, closed, closed},
+ {LastConsumerItems, ItemsCount, Size}).
+
+
+close_queue(Q) ->
+ test_util:stop_sync(Q, fun() ->
+ ok = couch_work_queue:close(Q)
+ end, ?TIMEOUT).
+
+spawn_consumer(Q) ->
+ Parent = self(),
+ spawn(fun() -> consumer_loop(Parent, Q, nil) end).
+
+consumer_loop(Parent, Q, PrevItem) ->
+ receive
+ {stop, Ref} ->
+ Parent ! {ok, Ref};
+ {ping, Ref} ->
+ Parent ! {pong, Ref},
+ consumer_loop(Parent, Q, PrevItem);
+ {last_item, Ref} ->
+ Parent ! {item, Ref, PrevItem},
+ consumer_loop(Parent, Q, PrevItem);
+ {consume, N} ->
+ Result = couch_work_queue:dequeue(Q, N),
+ consumer_loop(Parent, Q, Result)
+ end.
+
+spawn_producer(Q) ->
+ Parent = self(),
+ spawn(fun() -> producer_loop(Parent, Q) end).
+
+producer_loop(Parent, Q) ->
+ receive
+ {stop, Ref} ->
+ Parent ! {ok, Ref};
+ {ping, Ref} ->
+ Parent ! {pong, Ref},
+ producer_loop(Parent, Q);
+ {produce, Ref, Size} ->
+ Item = crypto:strong_rand_bytes(Size),
+ Parent ! {item, Ref, Item},
+ ok = couch_work_queue:queue(Q, Item),
+ producer_loop(Parent, Q)
+ end.
+
+consume(Consumer, N) ->
+ Consumer ! {consume, N}.
+
+last_consumer_items(Consumer) ->
+ Ref = make_ref(),
+ Consumer ! {last_item, Ref},
+ receive
+ {item, Ref, Items} ->
+ Items
+ after ?TIMEOUT ->
+ timeout
+ end.
+
+produce(Q, Producer, Size, Wait) ->
+ Ref = make_ref(),
+ ItemsCount = couch_work_queue:item_count(Q),
+ Producer ! {produce, Ref, Size},
+ receive
+ {item, Ref, Item} when Wait ->
+ ok = wait_increment(Q, ItemsCount),
+ Item;
+ {item, Ref, Item} ->
+ Item
+ after ?TIMEOUT ->
+ erlang:error({assertion_failed,
+ [{module, ?MODULE},
+ {line, ?LINE},
+ {reason, "Timeout asking producer to produce an item"}]})
+ end.
+
+ping(Pid) ->
+ Ref = make_ref(),
+ Pid ! {ping, Ref},
+ receive
+ {pong, Ref} ->
+ ok
+ after ?TIMEOUT ->
+ timeout
+ end.
+
+stop(Pid, Name) ->
+ Ref = make_ref(),
+ Pid ! {stop, Ref},
+ receive
+ {ok, Ref} -> ok
+ after ?TIMEOUT ->
+ ?debugMsg("Timeout stopping " ++ Name),
+ timeout
+ end.
+
+wait_increment(Q, ItemsCount) ->
+ test_util:wait(fun() ->
+ case couch_work_queue:item_count(Q) > ItemsCount of
+ true ->
+ ok;
+ false ->
+ wait
+ end
+ end).