diff options
Diffstat (limited to 'src/couch/test/eunit/couch_work_queue_tests.erl')
-rw-r--r-- | src/couch/test/eunit/couch_work_queue_tests.erl | 402 |
1 files changed, 402 insertions, 0 deletions
diff --git a/src/couch/test/eunit/couch_work_queue_tests.erl b/src/couch/test/eunit/couch_work_queue_tests.erl new file mode 100644 index 000000000..a192230ef --- /dev/null +++ b/src/couch/test/eunit/couch_work_queue_tests.erl @@ -0,0 +1,402 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_work_queue_tests). + +-include_lib("couch/include/couch_eunit.hrl"). + +-define(TIMEOUT, 100). + + +setup(Opts) -> + {ok, Q} = couch_work_queue:new(Opts), + Producer = spawn_producer(Q), + Consumer = spawn_consumer(Q), + {Q, Producer, Consumer}. + +setup_max_items() -> + setup([{max_items, 3}]). + +setup_max_size() -> + setup([{max_size, 160}]). + +setup_max_items_and_size() -> + setup([{max_size, 160}, {max_items, 3}]). + +setup_multi_workers() -> + {Q, Producer, Consumer1} = setup([{max_size, 160}, + {max_items, 3}, + {multi_workers, true}]), + Consumer2 = spawn_consumer(Q), + Consumer3 = spawn_consumer(Q), + {Q, Producer, [Consumer1, Consumer2, Consumer3]}. + +teardown({Q, Producer, Consumers}) when is_list(Consumers) -> + % consume all to unblock and let producer/consumer stop without timeout + [consume(Consumer, all) || Consumer <- Consumers], + + ok = close_queue(Q), + ok = stop(Producer, "producer"), + R = [stop(Consumer, "consumer") || Consumer <- Consumers], + R = [ok || _ <- Consumers], + ok; +teardown({Q, Producer, Consumer}) -> + teardown({Q, Producer, [Consumer]}). + + +single_consumer_test_() -> + { + "Single producer and consumer", + [ + { + "Queue with 3 max items", + { + foreach, + fun setup_max_items/0, fun teardown/1, + single_consumer_max_item_count() ++ common_cases() + } + }, + { + "Queue with max size of 160 bytes", + { + foreach, + fun setup_max_size/0, fun teardown/1, + single_consumer_max_size() ++ common_cases() + } + }, + { + "Queue with max size of 160 bytes and 3 max items", + { + foreach, + fun setup_max_items_and_size/0, fun teardown/1, + single_consumer_max_items_and_size() ++ common_cases() + } + } + ] + }. + +multiple_consumers_test_() -> + { + "Single producer and multiple consumers", + [ + { + "Queue with max size of 160 bytes and 3 max items", + { + foreach, + fun setup_multi_workers/0, fun teardown/1, + common_cases() ++ multiple_consumers() + } + + } + ] + }. + +common_cases()-> + [ + fun should_block_consumer_on_dequeue_from_empty_queue/1, + fun should_consume_right_item/1, + fun should_timeout_on_close_non_empty_queue/1, + fun should_not_block_producer_for_non_empty_queue_after_close/1, + fun should_be_closed/1 + ]. + +single_consumer_max_item_count()-> + [ + fun should_have_no_items_for_new_queue/1, + fun should_block_producer_on_full_queue_count/1, + fun should_receive_first_queued_item/1, + fun should_consume_multiple_items/1, + fun should_consume_all/1 + ]. + +single_consumer_max_size()-> + [ + fun should_have_zero_size_for_new_queue/1, + fun should_block_producer_on_full_queue_size/1, + fun should_increase_queue_size_on_produce/1, + fun should_receive_first_queued_item/1, + fun should_consume_multiple_items/1, + fun should_consume_all/1 + ]. + +single_consumer_max_items_and_size() -> + single_consumer_max_item_count() ++ single_consumer_max_size(). + +multiple_consumers() -> + [ + fun should_have_zero_size_for_new_queue/1, + fun should_have_no_items_for_new_queue/1, + fun should_increase_queue_size_on_produce/1 + ]. + + +should_have_no_items_for_new_queue({Q, _, _}) -> + ?_assertEqual(0, couch_work_queue:item_count(Q)). + +should_have_zero_size_for_new_queue({Q, _, _}) -> + ?_assertEqual(0, couch_work_queue:size(Q)). + +should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) -> + [consume(C, 2) || C <- Consumers], + Pongs = [ping(C) || C <- Consumers], + ?_assertEqual([timeout, timeout, timeout], Pongs); +should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) -> + consume(Consumer, 1), + Pong = ping(Consumer), + ?_assertEqual(timeout, Pong). + +should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) -> + [consume(C, 3) || C <- Consumers], + + Item1 = produce(Q, Producer, 10, false), + ok = ping(Producer), + ?assertEqual(0, couch_work_queue:item_count(Q)), + ?assertEqual(0, couch_work_queue:size(Q)), + + Item2 = produce(Q, Producer, 10, false), + ok = ping(Producer), + ?assertEqual(0, couch_work_queue:item_count(Q)), + ?assertEqual(0, couch_work_queue:size(Q)), + + Item3 = produce(Q, Producer, 10, false), + ok = ping(Producer), + ?assertEqual(0, couch_work_queue:item_count(Q)), + ?assertEqual(0, couch_work_queue:size(Q)), + + R = [{ping(C), Item} + || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])], + + ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R); +should_consume_right_item({Q, Producer, Consumer}) -> + consume(Consumer, 1), + Item = produce(Q, Producer, 10, false), + produce(Q, Producer, 20, true), + ok = ping(Producer), + ok = ping(Consumer), + {ok, Items} = last_consumer_items(Consumer), + ?_assertEqual([Item], Items). + +should_increase_queue_size_on_produce({Q, Producer, _}) -> + produce(Q, Producer, 50, true), + ok = ping(Producer), + Count1 = couch_work_queue:item_count(Q), + Size1 = couch_work_queue:size(Q), + + produce(Q, Producer, 10, true), + Count2 = couch_work_queue:item_count(Q), + Size2 = couch_work_queue:size(Q), + + ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]). + +should_block_producer_on_full_queue_count({Q, Producer, _}) -> + produce(Q, Producer, 10, true), + ?assertEqual(1, couch_work_queue:item_count(Q)), + ok = ping(Producer), + + produce(Q, Producer, 15, true), + ?assertEqual(2, couch_work_queue:item_count(Q)), + ok = ping(Producer), + + produce(Q, Producer, 20, true), + ?assertEqual(3, couch_work_queue:item_count(Q)), + Pong = ping(Producer), + + ?_assertEqual(timeout, Pong). + +should_block_producer_on_full_queue_size({Q, Producer, _}) -> + produce(Q, Producer, 100, true), + ok = ping(Producer), + ?assertEqual(1, couch_work_queue:item_count(Q)), + ?assertEqual(100, couch_work_queue:size(Q)), + + produce(Q, Producer, 110, false), + Pong = ping(Producer), + ?assertEqual(2, couch_work_queue:item_count(Q)), + ?assertEqual(210, couch_work_queue:size(Q)), + + ?_assertEqual(timeout, Pong). + +should_consume_multiple_items({Q, Producer, Consumer}) -> + Item1 = produce(Q, Producer, 10, true), + ok = ping(Producer), + + Item2 = produce(Q, Producer, 15, true), + ok = ping(Producer), + + consume(Consumer, 2), + + {ok, Items} = last_consumer_items(Consumer), + ?_assertEqual([Item1, Item2], Items). + +should_receive_first_queued_item({Q, Producer, Consumer}) -> + consume(Consumer, 100), + timeout = ping(Consumer), + + Item = produce(Q, Producer, 11, false), + ok = ping(Producer), + + ok = ping(Consumer), + ?assertEqual(0, couch_work_queue:item_count(Q)), + + {ok, Items} = last_consumer_items(Consumer), + ?_assertEqual([Item], Items). + +should_consume_all({Q, Producer, Consumer}) -> + Item1 = produce(Q, Producer, 10, true), + Item2 = produce(Q, Producer, 15, true), + Item3 = produce(Q, Producer, 20, true), + + consume(Consumer, all), + + {ok, Items} = last_consumer_items(Consumer), + ?_assertEqual([Item1, Item2, Item3], Items). + +should_timeout_on_close_non_empty_queue({Q, Producer, _}) -> + produce(Q, Producer, 1, true), + Status = close_queue(Q), + + ?_assertEqual(timeout, Status). + +should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) -> + produce(Q, Producer, 1, true), + close_queue(Q), + Pong = ping(Producer), + Size = couch_work_queue:size(Q), + Count = couch_work_queue:item_count(Q), + + ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}). + +should_be_closed({Q, _, Consumers}) when is_list(Consumers) -> + ok = close_queue(Q), + + [consume(C, 1) || C <- Consumers], + + LastConsumerItems = [last_consumer_items(C) || C <- Consumers], + ItemsCount = couch_work_queue:item_count(Q), + Size = couch_work_queue:size(Q), + + ?_assertEqual({[closed, closed, closed], closed, closed}, + {LastConsumerItems, ItemsCount, Size}); +should_be_closed({Q, _, Consumer}) -> + ok = close_queue(Q), + + consume(Consumer, 1), + + LastConsumerItems = last_consumer_items(Consumer), + ItemsCount = couch_work_queue:item_count(Q), + Size = couch_work_queue:size(Q), + + ?_assertEqual({closed, closed, closed}, + {LastConsumerItems, ItemsCount, Size}). + + +close_queue(Q) -> + test_util:stop_sync(Q, fun() -> + ok = couch_work_queue:close(Q) + end, ?TIMEOUT). + +spawn_consumer(Q) -> + Parent = self(), + spawn(fun() -> consumer_loop(Parent, Q, nil) end). + +consumer_loop(Parent, Q, PrevItem) -> + receive + {stop, Ref} -> + Parent ! {ok, Ref}; + {ping, Ref} -> + Parent ! {pong, Ref}, + consumer_loop(Parent, Q, PrevItem); + {last_item, Ref} -> + Parent ! {item, Ref, PrevItem}, + consumer_loop(Parent, Q, PrevItem); + {consume, N} -> + Result = couch_work_queue:dequeue(Q, N), + consumer_loop(Parent, Q, Result) + end. + +spawn_producer(Q) -> + Parent = self(), + spawn(fun() -> producer_loop(Parent, Q) end). + +producer_loop(Parent, Q) -> + receive + {stop, Ref} -> + Parent ! {ok, Ref}; + {ping, Ref} -> + Parent ! {pong, Ref}, + producer_loop(Parent, Q); + {produce, Ref, Size} -> + Item = crypto:strong_rand_bytes(Size), + Parent ! {item, Ref, Item}, + ok = couch_work_queue:queue(Q, Item), + producer_loop(Parent, Q) + end. + +consume(Consumer, N) -> + Consumer ! {consume, N}. + +last_consumer_items(Consumer) -> + Ref = make_ref(), + Consumer ! {last_item, Ref}, + receive + {item, Ref, Items} -> + Items + after ?TIMEOUT -> + timeout + end. + +produce(Q, Producer, Size, Wait) -> + Ref = make_ref(), + ItemsCount = couch_work_queue:item_count(Q), + Producer ! {produce, Ref, Size}, + receive + {item, Ref, Item} when Wait -> + ok = wait_increment(Q, ItemsCount), + Item; + {item, Ref, Item} -> + Item + after ?TIMEOUT -> + erlang:error({assertion_failed, + [{module, ?MODULE}, + {line, ?LINE}, + {reason, "Timeout asking producer to produce an item"}]}) + end. + +ping(Pid) -> + Ref = make_ref(), + Pid ! {ping, Ref}, + receive + {pong, Ref} -> + ok + after ?TIMEOUT -> + timeout + end. + +stop(Pid, Name) -> + Ref = make_ref(), + Pid ! {stop, Ref}, + receive + {ok, Ref} -> ok + after ?TIMEOUT -> + ?debugMsg("Timeout stopping " ++ Name), + timeout + end. + +wait_increment(Q, ItemsCount) -> + test_util:wait(fun() -> + case couch_work_queue:item_count(Q) > ItemsCount of + true -> + ok; + false -> + wait + end + end). |