diff options
author | Nicholas Clark <nick@ccl4.org> | 2009-09-28 17:00:50 +0100 |
---|---|---|
committer | Nicholas Clark <nick@ccl4.org> | 2009-09-29 11:12:38 +0100 |
commit | e4c73f034798b83906105bd2b28ce3805d0fa348 (patch) | |
tree | f2674de541e0aa86361691763314e28adf933f29 /dist/Thread-Queue | |
parent | 51bde7eebd4cedd091a03b9c38593f8e945fb4a9 (diff) | |
download | perl-e4c73f034798b83906105bd2b28ce3805d0fa348.tar.gz |
Move Thread::Queue from ext/ to dist/
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r-- | dist/Thread-Queue/lib/Thread/Queue.pm | 481 | ||||
-rw-r--r-- | dist/Thread-Queue/t/01_basic.t | 134 | ||||
-rw-r--r-- | dist/Thread-Queue/t/02_refs.t | 189 | ||||
-rw-r--r-- | dist/Thread-Queue/t/03_peek.t | 56 | ||||
-rw-r--r-- | dist/Thread-Queue/t/04_errs.t | 75 | ||||
-rw-r--r-- | dist/Thread-Queue/t/05_extract.t | 78 | ||||
-rw-r--r-- | dist/Thread-Queue/t/06_insert.t | 106 | ||||
-rw-r--r-- | dist/Thread-Queue/t/07_lock.t | 56 | ||||
-rw-r--r-- | dist/Thread-Queue/t/08_nothreads.t | 114 |
9 files changed, 1289 insertions, 0 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm new file mode 100644 index 0000000000..631edf126a --- /dev/null +++ b/dist/Thread-Queue/lib/Thread/Queue.pm @@ -0,0 +1,481 @@ +package Thread::Queue; + +use strict; +use warnings; + +our $VERSION = '2.11'; + +use threads::shared 1.21; +use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); + +# Carp errors from threads::shared calls should complain about caller +our @CARP_NOT = ("threads::shared"); + +# Predeclarations for internal functions +my ($validate_count, $validate_index); + +# Create a new queue possibly pre-populated with items +sub new +{ + my $class = shift; + my @queue :shared = map { shared_clone($_) } @_; + return bless(\@queue, $class); +} + +# Add items to the tail of a queue +sub enqueue +{ + my $queue = shift; + lock(@$queue); + push(@$queue, map { shared_clone($_) } @_) + and cond_signal(@$queue); +} + +# Return a count of the number of items on a queue +sub pending +{ + my $queue = shift; + lock(@$queue); + return scalar(@$queue); +} + +# Return 1 or more items from the head of a queue, blocking if needed +sub dequeue +{ + my $queue = shift; + lock(@$queue); + + my $count = @_ ? $validate_count->(shift) : 1; + + # Wait for requisite number of items + cond_wait(@$queue) until (@$queue >= $count); + cond_signal(@$queue) if (@$queue > $count); + + # Return single item + return shift(@$queue) if ($count == 1); + + # Return multiple items + my @items; + push(@items, shift(@$queue)) for (1..$count); + return @items; +} + +# Return items from the head of a queue with no blocking +sub dequeue_nb +{ + my $queue = shift; + lock(@$queue); + + my $count = @_ ? $validate_count->(shift) : 1; + + # Return single item + return shift(@$queue) if ($count == 1); + + # Return multiple items + my @items; + for (1..$count) { + last if (! @$queue); + push(@items, shift(@$queue)); + } + return @items; +} + +# Return an item without removing it from a queue +sub peek +{ + my $queue = shift; + lock(@$queue); + my $index = @_ ? $validate_index->(shift) : 0; + return $$queue[$index]; +} + +# Insert items anywhere into a queue +sub insert +{ + my $queue = shift; + lock(@$queue); + + my $index = $validate_index->(shift); + + return if (! @_); # Nothing to insert + + # Support negative indices + if ($index < 0) { + $index += @$queue; + if ($index < 0) { + $index = 0; + } + } + + # Dequeue items from $index onward + my @tmp; + while (@$queue > $index) { + unshift(@tmp, pop(@$queue)) + } + + # Add new items to the queue + push(@$queue, map { shared_clone($_) } @_); + + # Add previous items back onto the queue + push(@$queue, @tmp); + + # Soup's up + cond_signal(@$queue); +} + +# Remove items from anywhere in a queue +sub extract +{ + my $queue = shift; + lock(@$queue); + + my $index = @_ ? $validate_index->(shift) : 0; + my $count = @_ ? $validate_count->(shift) : 1; + + # Support negative indices + if ($index < 0) { + $index += @$queue; + if ($index < 0) { + $count += $index; + return if ($count <= 0); # Beyond the head of the queue + return $queue->dequeue_nb($count); # Extract from the head + } + } + + # Dequeue items from $index+$count onward + my @tmp; + while (@$queue > ($index+$count)) { + unshift(@tmp, pop(@$queue)) + } + + # Extract desired items + my @items; + unshift(@items, pop(@$queue)) while (@$queue > $index); + + # Add back any removed items + push(@$queue, @tmp); + + # Return single item + return $items[0] if ($count == 1); + + # Return multiple items + return @items; +} + +### Internal Functions ### + +# Check value of the requested index +$validate_index = sub { + my $index = shift; + + if (! defined($index) || + ! looks_like_number($index) || + (int($index) != $index)) + { + require Carp; + my ($method) = (caller(1))[3]; + $method =~ s/Thread::Queue:://; + $index = 'undef' if (! defined($index)); + Carp::croak("Invalid 'index' argument ($index) to '$method' method"); + } + + return $index; +}; + +# Check value of the requested count +$validate_count = sub { + my $count = shift; + + if (! defined($count) || + ! looks_like_number($count) || + (int($count) != $count) || + ($count < 1)) + { + require Carp; + my ($method) = (caller(1))[3]; + $method =~ s/Thread::Queue:://; + $count = 'undef' if (! defined($count)); + Carp::croak("Invalid 'count' argument ($count) to '$method' method"); + } + + return $count; +}; + +1; + +=head1 NAME + +Thread::Queue - Thread-safe queues + +=head1 VERSION + +This document describes Thread::Queue version 2.11 + +=head1 SYNOPSIS + + use strict; + use warnings; + + use threads; + use Thread::Queue; + + my $q = Thread::Queue->new(); # A new empty queue + + # Worker thread + my $thr = threads->create(sub { + while (my $item = $q->dequeue()) { + # Do work on $item + } + })->detach(); + + # Send work to the thread + $q->enqueue($item1, ...); + + + # Count of items in the queue + my $left = $q->pending(); + + # Non-blocking dequeue + if (defined(my $item = $q->dequeue_nb())) { + # Work on $item + } + + # Get the second item in the queue without dequeuing anything + my $item = $q->peek(1); + + # Insert two items into the queue just behind the head + $q->insert(1, $item1, $item2); + + # Extract the last two items on the queue + my ($item1, $item2) = $q->extract(-2, 2); + +=head1 DESCRIPTION + +This module provides thread-safe FIFO queues that can be accessed safely by +any number of threads. + +Any data types supported by L<threads::shared> can be passed via queues: + +=over + +=item Ordinary scalars + +=item Array refs + +=item Hash refs + +=item Scalar refs + +=item Objects based on the above + +=back + +Ordinary scalars are added to queues as they are. + +If not already thread-shared, the other complex data types will be cloned +(recursively, if needed, and including any C<bless>ings and read-only +settings) into thread-shared structures before being placed onto a queue. + +For example, the following would cause L<Thread::Queue> to create a empty, +shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' +and 'baz' from C<@ary> into it, and then place that shared reference onto +the queue: + + my @ary = qw/foo bar baz/; + $q->enqueue(\@ary); + +However, for the following, the items are already shared, so their references +are added directly to the queue, and no cloning takes place: + + my @ary :shared = qw/foo bar baz/; + $q->enqueue(\@ary); + + my $obj = &shared({}); + $$obj{'foo'} = 'bar'; + $$obj{'qux'} = 99; + bless($obj, 'My::Class'); + $q->enqueue($obj); + +See L</"LIMITATIONS"> for caveats related to passing objects via queues. + +=head1 QUEUE CREATION + +=over + +=item ->new() + +Creates a new empty queue. + +=item ->new(LIST) + +Creates a new queue pre-populated with the provided list of items. + +=back + +=head1 BASIC METHODS + +The following methods deal with queues on a FIFO basis. + +=over + +=item ->enqueue(LIST) + +Adds a list of items onto the end of the queue. + +=item ->dequeue() + +=item ->dequeue(COUNT) + +Removes the requested number of items (default is 1) from the head of the +queue, and returns them. If the queue contains fewer than the requested +number of items, then the thread will be blocked until the requisite number +of items are available (i.e., until other threads <enqueue> more items). + +=item ->dequeue_nb() + +=item ->dequeue_nb(COUNT) + +Removes the requested number of items (default is 1) from the head of the +queue, and returns them. If the queue contains fewer than the requested +number of items, then it immediately (i.e., non-blocking) returns whatever +items there are on the queue. If the queue is empty, then C<undef> is +returned. + +=item ->pending() + +Returns the number of items still in the queue. + +=back + +=head1 ADVANCED METHODS + +The following methods can be used to manipulate items anywhere in a queue. + +To prevent the contents of a queue from being modified by another thread +while it is being examined and/or changed, L<lock|threads::shared/"lock +VARIABLE"> the queue inside a local block: + + { + lock($q); # Keep other threads from changing the queue's contents + my $item = $q->peek(); + if ($item ...) { + ... + } + } + # Queue is now unlocked + +=over + +=item ->peek() + +=item ->peek(INDEX) + +Returns an item from the queue without dequeuing anything. Defaults to the +the head of queue (at index position 0) if no index is specified. Negative +index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 +is the end of the queue, -2 is next to last, and so on). + +If no items exists at the specified index (i.e., the queue is empty, or the +index is beyond the number of items on the queue), then C<undef> is returned. + +Remember, the returned item is not removed from the queue, so manipulating a +C<peek>ed at reference affects the item on the queue. + +=item ->insert(INDEX, LIST) + +Adds the list of items to the queue at the specified index position (0 +is the head of the list). Any existing items at and beyond that position are +pushed back past the newly added items: + + $q->enqueue(1, 2, 3, 4); + $q->insert(1, qw/foo bar/); + # Queue now contains: 1, foo, bar, 2, 3, 4 + +Specifying an index position greater than the number of items in the queue +just adds the list to the end. + +Negative index positions are supported: + + $q->enqueue(1, 2, 3, 4); + $q->insert(-2, qw/foo bar/); + # Queue now contains: 1, 2, foo, bar, 3, 4 + +Specifying a negative index position greater than the number of items in the +queue adds the list to the head of the queue. + +=item ->extract() + +=item ->extract(INDEX) + +=item ->extract(INDEX, COUNT) + +Removes and returns the specified number of items (defaults to 1) from the +specified index position in the queue (0 is the head of the queue). When +called with no arguments, C<extract> operates the same as C<dequeue_nb>. + +This method is non-blocking, and will return only as many items as are +available to fulfill the request: + + $q->enqueue(1, 2, 3, 4); + my $item = $q->extract(2) # Returns 3 + # Queue now contains: 1, 2, 4 + my @items = $q->extract(1, 3) # Returns (2, 4) + # Queue now contains: 1 + +Specifying an index position greater than the number of items in the +queue results in C<undef> or an empty list being returned. + + $q->enqueue('foo'); + my $nada = $q->extract(3) # Returns undef + my @nada = $q->extract(1, 3) # Returns () + +Negative index positions are supported. Specifying a negative index position +greater than the number of items in the queue may return items from the head +of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the +queue from the specified position (i.e. if queue size + index + count is +greater than zero): + + $q->enqueue(qw/foo bar baz/); + my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 + my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 + # Queue now contains: bar, baz + my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 + +=back + +=head1 NOTES + +Queues created by L<Thread::Queue> can be used in both threaded and +non-threaded applications. + +=head1 LIMITATIONS + +Passing objects on queues may not work if the objects' classes do not support +sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. + +Passing array/hash refs that contain objects may not work for Perl prior to +5.10.0. + +=head1 SEE ALSO + +Thread::Queue Discussion Forum on CPAN: +L<http://www.cpanforum.com/dist/Thread-Queue> + +Annotated POD for Thread::Queue: +L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm> + +Source repository: +L<http://code.google.com/p/thread-queue/> + +L<threads>, L<threads::shared> + +=head1 MAINTAINER + +Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> + +=head1 LICENSE + +This program is free software; you can redistribute it and/or modify it under +the same terms as Perl itself. + +=cut diff --git a/dist/Thread-Queue/t/01_basic.t b/dist/Thread-Queue/t/01_basic.t new file mode 100644 index 0000000000..6a0d8387e8 --- /dev/null +++ b/dist/Thread-Queue/t/01_basic.t @@ -0,0 +1,134 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 81); + +### Basic usage with multiple threads ### + +my $nthreads = 5; + +my $q = Thread::Queue->new(1..$nthreads); +ok($q, 'New queue'); +is($q->pending(), $nthreads, 'Pre-populated queue count'); + +sub reader { + my $id = threads->tid(); + while ((my $el = $q->dequeue()) != -1) { + ok($el >= 1, "Thread $id got $el"); + select(undef, undef, undef, rand(1)); + } + ok(1, "Thread $id done"); +} + +my @threads; +push(@threads, threads->create('reader')) for (1..$nthreads); + +for (1..20) { + select(undef, undef, undef, rand(1)); + $q->enqueue($_); +} + +$q->enqueue((-1) x $nthreads); # One end marker for each thread + +$_->join() foreach @threads; +undef(@threads); + +is($q->pending(), 0, 'Empty queue'); + + +### ->dequeue_nb() test ### + +$q = Thread::Queue->new(); +ok($q, 'New queue'); +is($q->pending(), 0, 'Empty queue'); + +my @items = qw/foo bar baz/; +$q->enqueue(@items); + +threads->create(sub { + is($q->pending(), scalar(@items), 'Queue count in thread'); + while (my $el = $q->dequeue_nb()) { + is($el, shift(@items), "Thread got $el"); + } + is($q->pending(), 0, 'Empty queue'); + $q->enqueue('done'); +})->join(); + +is($q->pending(), 1, 'Queue count after thread'); +is($q->dequeue(), 'done', 'Thread reported done'); +is($q->pending(), 0, 'Empty queue'); + + +### ->dequeue(COUNT) test ### + +my $count = 3; + +sub reader2 { + my $id = threads->tid(); + while (1) { + my @el = $q->dequeue($count); + is(scalar(@el), $count, "Thread $id got @el"); + select(undef, undef, undef, rand(1)); + return if ($el[0] == 0); + } +} + +push(@threads, threads->create('reader2')) for (1..$nthreads); + +$q->enqueue(1..4*$count*$nthreads); +$q->enqueue((0) x ($count*$nthreads)); + +$_->join() foreach @threads; +undef(@threads); + +is($q->pending(), 0, 'Empty queue'); + + +### ->dequeue_nb(COUNT) test ### + +@items = qw/foo bar baz qux exit/; +$q->enqueue(@items); +is($q->pending(), scalar(@items), 'Queue count'); + +threads->create(sub { + is($q->pending(), scalar(@items), 'Queue count in thread'); + while (my @el = $q->dequeue_nb(2)) { + is($el[0], shift(@items), "Thread got $el[0]"); + if ($el[0] eq 'exit') { + is(scalar(@el), 1, 'Thread to exit'); + } else { + is($el[1], shift(@items), "Thread got $el[1]"); + } + } + is($q->pending(), 0, 'Empty queue'); + $q->enqueue('done'); +})->join(); + +is($q->pending(), 1, 'Queue count after thread'); +is($q->dequeue(), 'done', 'Thread reported done'); +is($q->pending(), 0, 'Empty queue'); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/02_refs.t b/dist/Thread-Queue/t/02_refs.t new file mode 100644 index 0000000000..3a59b5e3b6 --- /dev/null +++ b/dist/Thread-Queue/t/02_refs.t @@ -0,0 +1,189 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use threads::shared; +use Thread::Queue; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 46); + +# Regular array +my @ary1 = qw/foo bar baz/; +push(@ary1, [ 1..3 ], { 'qux' => 99 }); + +# Shared array +my @ary2 :shared = (99, 21, 86); + +# Regular hash-based object +my $obj1 = { + 'foo' => 'bar', + 'qux' => 99, + 'biff' => [ qw/fee fi fo/ ], + 'boff' => { 'bork' => 'true' }, +}; +bless($obj1, 'Foo'); + +# Shared hash-based object +my $obj2 = &share({}); +$$obj2{'bar'} = 86; +$$obj2{'key'} = 'foo'; +bless($obj2, 'Bar'); + +# Scalar ref +my $sref1 = \do{ my $scalar = 'foo'; }; + +# Shared scalar ref object +my $sref2 = \do{ my $scalar = 69; }; +share($sref2); +bless($sref2, 'Baz'); + +# Ref of ref +my $foo = [ 5, 'bork', { 'now' => 123 } ]; +my $bar = \$foo; +my $baz = \$bar; +my $qux = \$baz; +is_deeply($$$$qux, $foo, 'Ref of ref'); + +# Circular refs +my $cir1; +$cir1 = \$cir1; + +my $cir1s : shared; +$cir1s = \$cir1s; + +my $cir2; +$cir2 = [ \$cir2, { 'ref' => \$cir2 } ]; + +my $cir3 :shared = &share({}); +$cir3->{'self'} = \$cir3; +bless($cir3, 'Circular'); + +# Queue up items +my $q = Thread::Queue->new(\@ary1, \@ary2); +ok($q, 'New queue'); +is($q->pending(), 2, 'Queue count'); +$q->enqueue($obj1, $obj2); +is($q->pending(), 4, 'Queue count'); +$q->enqueue($sref1, $sref2, $foo, $qux); +is($q->pending(), 8, 'Queue count'); +$q->enqueue($cir1, $cir1s, $cir2, $cir3); +is($q->pending(), 12, 'Queue count'); + +# Process items in thread +threads->create(sub { + is($q->pending(), 12, 'Queue count in thread'); + + my $tary1 = $q->dequeue(); + ok($tary1, 'Thread got item'); + is(ref($tary1), 'ARRAY', 'Item is array ref'); + is_deeply($tary1, \@ary1, 'Complex array'); + $$tary1[1] = 123; + + my $tary2 = $q->dequeue(); + ok($tary2, 'Thread got item'); + is(ref($tary2), 'ARRAY', 'Item is array ref'); + for (my $ii=0; $ii < @ary2; $ii++) { + is($$tary2[$ii], $ary2[$ii], 'Shared array element check'); + } + $$tary2[1] = 444; + + my $tobj1 = $q->dequeue(); + ok($tobj1, 'Thread got item'); + is(ref($tobj1), 'Foo', 'Item is object'); + is_deeply($tobj1, $obj1, 'Object comparison'); + $$tobj1{'foo'} = '.|.'; + $$tobj1{'smiley'} = ':)'; + + my $tobj2 = $q->dequeue(); + ok($tobj2, 'Thread got item'); + is(ref($tobj2), 'Bar', 'Item is object'); + is($$tobj2{'bar'}, 86, 'Shared object element check'); + is($$tobj2{'key'}, 'foo', 'Shared object element check'); + $$tobj2{'tick'} = 'tock'; + $$tobj2{'frowny'} = ':('; + + my $tsref1 = $q->dequeue(); + ok($tsref1, 'Thread got item'); + is(ref($tsref1), 'SCALAR', 'Item is scalar ref'); + is($$tsref1, 'foo', 'Scalar ref contents'); + $$tsref1 = 0; + + my $tsref2 = $q->dequeue(); + ok($tsref2, 'Thread got item'); + is(ref($tsref2), 'Baz', 'Item is object'); + is($$tsref2, 69, 'Shared scalar ref contents'); + $$tsref2 = 'zzz'; + + my $myfoo = $q->dequeue(); + is_deeply($myfoo, $foo, 'Array ref'); + + my $qux = $q->dequeue(); + is_deeply($$$$qux, $foo, 'Ref of ref'); + + my ($c1, $c1s, $c2, $c3) = $q->dequeue(4); + SKIP: { + skip("Needs threads::shared >= 1.19", 5) + if ($threads::shared::VERSION < 1.19); + + is(threads::shared::_id($$c1), + threads::shared::_id($c1), + 'Circular ref - scalar'); + + is(threads::shared::_id($$c1s), + threads::shared::_id($c1s), + 'Circular ref - shared scalar'); + + is(threads::shared::_id(${$c2->[0]}), + threads::shared::_id($c2), + 'Circular ref - array'); + + is(threads::shared::_id(${$c2->[1]->{'ref'}}), + threads::shared::_id($c2), + 'Circular ref - mixed'); + + is(threads::shared::_id(${$c3->{'self'}}), + threads::shared::_id($c3), + 'Circular ref - hash'); + } + + is($q->pending(), 0, 'Empty queue'); + my $nothing = $q->dequeue_nb(); + ok(! defined($nothing), 'Nothing on queue'); +})->join(); + +# Check results of thread's activities +is($q->pending(), 0, 'Empty queue'); + +is($ary1[1], 'bar', 'Array unchanged'); +is($ary2[1], 444, 'Shared array changed'); + +is($$obj1{'foo'}, 'bar', 'Object unchanged'); +ok(! exists($$obj1{'smiley'}), 'Object unchanged'); + +is($$obj2{'tick'}, 'tock', 'Shared object changed'); +is($$obj2{'frowny'}, ':(', 'Shared object changed'); + +is($$sref1, 'foo', 'Scalar ref unchanged'); +is($$sref2, 'zzz', 'Shared scalar ref changed'); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/03_peek.t b/dist/Thread-Queue/t/03_peek.t new file mode 100644 index 0000000000..1844c06dea --- /dev/null +++ b/dist/Thread-Queue/t/03_peek.t @@ -0,0 +1,56 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 19); + +my $q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +$q->enqueue([ qw/foo bar/ ]); + +sub q_check +{ + is($q->peek(3), 4, 'Peek at queue'); + is($q->peek(-3), 9, 'Negative peek'); + + my $nada = $q->peek(20); + ok(! defined($nada), 'Big peek'); + $nada = $q->peek(-20); + ok(! defined($nada), 'Big negative peek'); + + my $ary = $q->peek(-1); + is_deeply($ary, [ qw/foo bar/ ], 'Peek array'); + + is($q->pending(), 11, 'Queue count in thread'); +} + +threads->create(sub { + q_check(); + threads->create('q_check')->join(); +})->join(); +q_check(); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/04_errs.t b/dist/Thread-Queue/t/04_errs.t new file mode 100644 index 0000000000..3479c83c9e --- /dev/null +++ b/dist/Thread-Queue/t/04_errs.t @@ -0,0 +1,75 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } +} + +use Thread::Queue; + +use Test::More 'tests' => 26; + +my $q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +eval { $q->dequeue(undef); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue(0); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue(0.5); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue(-1); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue('foo'); }; +like($@, qr/Invalid 'count'/, $@); + +eval { $q->dequeue_nb(undef); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue_nb(0); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue_nb(-0.5); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue_nb(-1); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->dequeue_nb('foo'); }; +like($@, qr/Invalid 'count'/, $@); + +eval { $q->peek(undef); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->peek(3.3); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->peek('foo'); }; +like($@, qr/Invalid 'index'/, $@); + +eval { $q->insert(); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->insert(undef); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->insert(.22); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->insert('foo'); }; +like($@, qr/Invalid 'index'/, $@); + +eval { $q->extract(undef); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->extract('foo'); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->extract(1.1); }; +like($@, qr/Invalid 'index'/, $@); +eval { $q->extract(0, undef); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->extract(0, 0); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->extract(0, 3.3); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->extract(0, -1); }; +like($@, qr/Invalid 'count'/, $@); +eval { $q->extract(0, 'foo'); }; +like($@, qr/Invalid 'count'/, $@); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/05_extract.t b/dist/Thread-Queue/t/05_extract.t new file mode 100644 index 0000000000..2773340932 --- /dev/null +++ b/dist/Thread-Queue/t/05_extract.t @@ -0,0 +1,78 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 20); + +my $q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + # Default count = 1 + is($q->extract(), 1, 'No args'); # 2..10 left + is($q->extract(0), 2, 'Head'); # 3..10 left + is($q->extract(5), 8, 'Pos index'); # 3..7,9,10 left + is($q->extract(-3), 7, 'Neg index'); # 3..6,9,10 left + my $x = $q->extract(20); # unchanged + ok(! defined($x), 'Big index'); + $x = $q->extract(-20); # unchanged + ok(! defined($x), 'Big neg index'); +})->join(); + +$q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + my @x = $q->extract(0, 2); # 3..10 left + is_deeply(\@x, [1,2], '2 from head'); + @x = $q->extract(6, 2); # 3..8 left + is_deeply(\@x, [9,10], '2 from tail'); + @x = $q->extract(2, 2); # 3,4,7,8 left + is_deeply(\@x, [5,6], '2 from middle'); + @x = $q->extract(2, 4); # 3,4 left + is_deeply(\@x, [7,8], 'Lots from tail'); + @x = $q->extract(3, 4); # unchanged + is_deeply(\@x, [], 'Too far'); +})->join(); + +$q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + my @x = $q->extract(-4, 2); # 1..6,9,10 left + is_deeply(\@x, [7,8], 'Neg index'); + @x = $q->extract(-2, 4); # 1..6 left + is_deeply(\@x, [9,10], 'Lots from tail'); + @x = $q->extract(-6, 2); # 3..6 left + is_deeply(\@x, [1,2], 'Max neg index'); + @x = $q->extract(-10, 3); # unchanged + is_deeply(\@x, [], 'Too far'); + @x = $q->extract(-6, 3); # 4..6 left + is_deeply(\@x, [3], 'Neg overlap'); + @x = $q->extract(-5, 10); # empty + is_deeply(\@x, [4..6], 'Neg big overlap'); +})->join(); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/06_insert.t b/dist/Thread-Queue/t/06_insert.t new file mode 100644 index 0000000000..4f9c2b47ec --- /dev/null +++ b/dist/Thread-Queue/t/06_insert.t @@ -0,0 +1,106 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 16); + +my $q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + $q->insert(5); + $q->insert(-5); + $q->insert(100); + $q->insert(-100); +})->join(); + +my @x = $q->dequeue_nb(100); +is_deeply(\@x, [1..10], 'No-op inserts'); + + +$q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + $q->insert(10, qw/tail/); + $q->insert(0, qw/head/); +})->join(); + +@x = $q->dequeue_nb(100); +is_deeply(\@x, ['head',1..10,'tail'], 'Edge inserts'); + + +$q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + $q->insert(5, qw/foo bar/); + $q->insert(-2, qw/qux/); +})->join(); + +@x = $q->dequeue_nb(100); +is_deeply(\@x, [1..5,'foo','bar',6..8,'qux',9,10], 'Middle inserts'); + + +$q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +threads->create(sub { + $q->insert(20, qw/tail/); + $q->insert(-20, qw/head/); +})->join(); + +@x = $q->dequeue_nb(100); +is_deeply(\@x, ['head',1..10,'tail'], 'Extreme inserts'); + + +$q = Thread::Queue->new(); +ok($q, 'New queue'); +threads->create(sub { $q->insert(0, 1..3); })->join(); +@x = $q->dequeue_nb(100); +is_deeply(\@x, [1..3], 'Empty queue insert'); + +$q = Thread::Queue->new(); +ok($q, 'New queue'); +threads->create(sub { $q->insert(20, 1..3); })->join(); +@x = $q->dequeue_nb(100); +is_deeply(\@x, [1..3], 'Empty queue insert'); + +$q = Thread::Queue->new(); +ok($q, 'New queue'); +threads->create(sub { $q->insert(-1, 1..3); })->join(); +@x = $q->dequeue_nb(100); +is_deeply(\@x, [1..3], 'Empty queue insert'); + +$q = Thread::Queue->new(); +ok($q, 'New queue'); +threads->create(sub { + $q->insert(2, 1..3); + $q->insert(1, 'foo'); +})->join(); +@x = $q->dequeue_nb(100); +is_deeply(\@x, [1,'foo',2,3], 'Empty queue insert'); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/07_lock.t b/dist/Thread-Queue/t/07_lock.t new file mode 100644 index 0000000000..625159e97c --- /dev/null +++ b/dist/Thread-Queue/t/07_lock.t @@ -0,0 +1,56 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; +use Thread::Semaphore; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 3); + +# The following tests locking a queue + +my $q = Thread::Queue->new(1..10); +ok($q, 'New queue'); + +my $sm = Thread::Semaphore->new(0); +my $st = Thread::Semaphore->new(0); + +threads->create(sub { + { + lock($q); + $sm->up(); + $st->down(); + threads::yield(); + select(undef, undef, undef, 0.1); + my @x = $q->extract(5,2); + is_deeply(\@x, [6,7], 'Thread dequeues under lock'); + } +})->detach(); + +$sm->down(); +$st->up(); +my @x = $q->dequeue_nb(100); +is_deeply(\@x, [1..5,8..10], 'Main dequeues'); +threads::yield(); + +exit(0); + +# EOF diff --git a/dist/Thread-Queue/t/08_nothreads.t b/dist/Thread-Queue/t/08_nothreads.t new file mode 100644 index 0000000000..7ac43481b3 --- /dev/null +++ b/dist/Thread-Queue/t/08_nothreads.t @@ -0,0 +1,114 @@ +use strict; +use warnings; + +BEGIN { + if ($ENV{'PERL_CORE'}){ + chdir('t'); + unshift(@INC, '../lib'); + } +} + +use Test::More 'tests' => 32; + +use Thread::Queue; + +# Regular array +my @ary1 = qw/foo bar baz/; +push(@ary1, [ 1..3 ], { 'qux' => 99 }); + +# Shared array +my @ary2 :shared = (99, 21, 86); + +# Regular hash-based object +my $obj1 = { + 'foo' => 'bar', + 'qux' => 99, + 'biff' => [ qw/fee fi fo/ ], + 'boff' => { 'bork' => 'true' }, +}; +bless($obj1, 'Foo'); + +# Shared hash-based object +my $obj2 = &threads::shared::share({}); +$$obj2{'bar'} = 86; +$$obj2{'key'} = 'foo'; +bless($obj2, 'Bar'); + +# Scalar ref +my $sref1 = \do{ my $scalar = 'foo'; }; + +# Shared scalar ref object +my $sref2 = \do{ my $scalar = 69; }; +threads::shared::share($sref2); +bless($sref2, 'Baz'); + +# Ref of ref +my $foo = [ 5, 'bork', { 'now' => 123 } ]; +my $bar = \$foo; +my $baz = \$bar; +my $qux = \$baz; +is_deeply($$$$qux, $foo, 'Ref of ref'); + +# Queue up items +my $q = Thread::Queue->new(\@ary1, \@ary2); +ok($q, 'New queue'); +is($q->pending(), 2, 'Queue count'); +$q->enqueue($obj1, $obj2); +is($q->pending(), 4, 'Queue count'); +$q->enqueue($sref1, $sref2, $qux); +is($q->pending(), 7, 'Queue count'); + +# Process items in queue +{ + is($q->pending(), 7, 'Queue count in thread'); + + my $ref = $q->peek(3); + is(ref($ref), 'Bar', 'Item is object'); + + my $tary1 = $q->dequeue(); + ok($tary1, 'Thread got item'); + is(ref($tary1), 'ARRAY', 'Item is array ref'); + is_deeply($tary1, \@ary1, 'Complex array'); + + my $tary2 = $q->dequeue(); + ok($tary2, 'Thread got item'); + is(ref($tary2), 'ARRAY', 'Item is array ref'); + for (my $ii=0; $ii < @ary2; $ii++) { + is($$tary2[$ii], $ary2[$ii], 'Shared array element check'); + } + + my $tobj1 = $q->dequeue(); + ok($tobj1, 'Thread got item'); + is(ref($tobj1), 'Foo', 'Item is object'); + is_deeply($tobj1, $obj1, 'Object comparison'); + + my $tobj2 = $q->dequeue(); + ok($tobj2, 'Thread got item'); + is(ref($tobj2), 'Bar', 'Item is object'); + is($$tobj2{'bar'}, 86, 'Shared object element check'); + is($$tobj2{'key'}, 'foo', 'Shared object element check'); + + my $tsref1 = $q->dequeue(); + ok($tsref1, 'Thread got item'); + is(ref($tsref1), 'SCALAR', 'Item is scalar ref'); + is($$tsref1, 'foo', 'Scalar ref contents'); + + my $tsref2 = $q->dequeue(); + ok($tsref2, 'Thread got item'); + is(ref($tsref2), 'Baz', 'Item is object'); + is($$tsref2, 69, 'Shared scalar ref contents'); + + my $qux = $q->dequeue(); + is_deeply($$$$qux, $foo, 'Ref of ref'); + + is($q->pending(), 0, 'Empty queue'); + my $nothing = $q->dequeue_nb(); + ok(! defined($nothing), 'Nothing on queue'); +} + +# Check results of thread's activities +is($q->pending(), 0, 'Empty queue'); + +exit(0); + +# EOF |