diff options
author | Jerry D. Hedden <jdhedden@cpan.org> | 2012-10-23 22:48:50 -0400 |
---|---|---|
committer | Father Chrysostomos <sprout@cpan.org> | 2012-11-06 12:33:38 -0800 |
commit | 1fd4700eab6b2624e106e061f227763e0893f452 (patch) | |
tree | a5b8a59a01a0d874a05ecd0617992a4cabec6a63 /dist/Thread-Queue | |
parent | 9eb48717912f4fd262be607b74cc3c89014fc58b (diff) | |
download | perl-1fd4700eab6b2624e106e061f227763e0893f452.tar.gz |
Upgrade to Thread::Queue 3.01
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r-- | dist/Thread-Queue/lib/Thread/Queue.pm | 114 | ||||
-rw-r--r-- | dist/Thread-Queue/t/09_ended.t | 146 |
2 files changed, 229 insertions, 31 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm index 8588ed5845..0bf16244b2 100644 --- a/dist/Thread-Queue/lib/Thread/Queue.pm +++ b/dist/Thread-Queue/lib/Thread/Queue.pm @@ -3,7 +3,7 @@ package Thread::Queue; use strict; use warnings; -our $VERSION = '2.12'; +our $VERSION = '3.01'; $VERSION = eval $VERSION; use threads::shared 1.21; @@ -20,37 +20,58 @@ sub new { my $class = shift; my @queue :shared = map { shared_clone($_) } @_; - return bless(\@queue, $class); + my %self :shared = ( 'queue' => \@queue ); + return bless(\%self, $class); } # Add items to the tail of a queue sub enqueue { - my $queue = shift; - lock(@$queue); - push(@$queue, map { shared_clone($_) } @_) - and cond_signal(@$queue); + my $self = shift; + lock(%$self); + if ($$self{'ENDED'}) { + require Carp; + Carp::croak("'enqueue' method called on queue that has been 'end'ed"); + } + push(@{$$self{'queue'}}, map { shared_clone($_) } @_) + and cond_signal(%$self); } # Return a count of the number of items on a queue sub pending { - my $queue = shift; - lock(@$queue); - return scalar(@$queue); + my $self = shift; + lock(%$self); + return if ($$self{'ENDED'} && ! @{$$self{'queue'}}); + return scalar(@{$$self{'queue'}}); +} + +# Indicate that no more data will enter the queue +sub end +{ + my $self = shift; + lock $self; + # No more data is coming + $$self{'ENDED'} = 1; + # Try to release at least one blocked thread + cond_signal(%$self); } # Return 1 or more items from the head of a queue, blocking if needed sub dequeue { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + my $queue = $$self{'queue'}; my $count = @_ ? $validate_count->(shift) : 1; # Wait for requisite number of items - cond_wait(@$queue) until (@$queue >= $count); - cond_signal(@$queue) if (@$queue > $count); + cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'}); + cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'}); + + # If no longer blocking, try getting whatever is left on the queue + return $self->dequeue_nb($count) if ($$self{'ENDED'}); # Return single item return shift(@$queue) if ($count == 1); @@ -64,8 +85,9 @@ sub dequeue # Return items from the head of a queue with no blocking sub dequeue_nb { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + my $queue = $$self{'queue'}; my $count = @_ ? $validate_count->(shift) : 1; @@ -84,17 +106,24 @@ sub dequeue_nb # Return an item without removing it from a queue sub peek { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); my $index = @_ ? $validate_index->(shift) : 0; - return $$queue[$index]; + return $$self{'queue'}[$index]; } # Insert items anywhere into a queue sub insert { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + + if ($$self{'ENDED'}) { + require Carp; + Carp::croak("'insert' method called on queue that has been 'end'ed"); + } + + my $queue = $$self{'queue'}; my $index = $validate_index->(shift); @@ -121,14 +150,15 @@ sub insert push(@$queue, @tmp); # Soup's up - cond_signal(@$queue); + cond_signal(%$self); } # Remove items from anywhere in a queue sub extract { - my $queue = shift; - lock(@$queue); + my $self = shift; + lock(%$self); + my $queue = $$self{'queue'}; my $index = @_ ? $validate_index->(shift) : 0; my $count = @_ ? $validate_count->(shift) : 1; @@ -139,7 +169,7 @@ sub extract if ($index < 0) { $count += $index; return if ($count <= 0); # Beyond the head of the queue - return $queue->dequeue_nb($count); # Extract from the head + return $self->dequeue_nb($count); # Extract from the head } } @@ -210,7 +240,7 @@ Thread::Queue - Thread-safe queues =head1 VERSION -This document describes Thread::Queue version 2.12 +This document describes Thread::Queue version 3.01 =head1 SYNOPSIS @@ -223,15 +253,24 @@ This document describes Thread::Queue version 2.12 my $q = Thread::Queue->new(); # A new empty queue # Worker thread - my $thr = threads->create(sub { - while (my $item = $q->dequeue()) { - # Do work on $item - } - })->detach(); + my $thr = threads->create( + sub { + # Thread will loop until no more work + while (defined(my $item = $q->dequeue())) { + # Do work on $item + ... + } + } + ); # Send work to the thread $q->enqueue($item1, ...); + # Signal that there is no more work to be sent + $q->end(); + # Join up with the thread when it finishes + $thr->join(); + ... # Count of items in the queue my $left = $q->pending(); @@ -344,7 +383,18 @@ returned. =item ->pending() -Returns the number of items still in the queue. +Returns the number of items still in the queue. Returns C<undef> if the queue +has been ended (see below), and there are no more items in the queue. + +=item ->end() + +Declares that no more items will be added to the queue. + +All threads blocking on C<dequeue()> calls will be unblocked with any +remaining items in the queue and/or C<undef> being returned. Any subsequent +calls to C<dequeue()> will behave like C<dequeue_nb()>. + +Once ended, no more items may be placed in the queue. =back @@ -464,6 +514,8 @@ L<http://www.cpanforum.com/dist/Thread-Queue> L<threads>, L<threads::shared> +Sample code in the I<examples> directory of this distribution on CPAN. + =head1 MAINTAINER Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> diff --git a/dist/Thread-Queue/t/09_ended.t b/dist/Thread-Queue/t/09_ended.t new file mode 100644 index 0000000000..a0a9292f08 --- /dev/null +++ b/dist/Thread-Queue/t/09_ended.t @@ -0,0 +1,146 @@ +use strict; +use warnings; + +use Config; + +BEGIN { + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } + if (! $Config{'d_select'}) { + print("1..0 # SKIP 'select()' not available for testing\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +use Test::More; + +my $num_threads = 3; +my $cycles = 2; +my $count = 2; +plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6; + +# Test for end() while threads are blocked and no more items in queue +{ + my @items = 1..($num_threads*$cycles*$count); + my $q = Thread::Queue->new(@items); + my $r = Thread::Queue->new(); + + my @threads; + for my $ii (1..$num_threads) { + push @threads, threads->create( sub { + # Thread will loop until no more work is coming + LOOP: + while (my @set = $q->dequeue($count)) { + foreach my $item (@set) { + last LOOP if (! defined($item)); + pass("'$item' read from queue in thread $ii"); + } + select(undef, undef, undef, rand(1)); + $r->enqueue($ii); + } + pass("Thread $ii exiting"); + }); + } + + # Make sure there's nothing in the queue and threads are blocking + for my $ii (1..($num_threads*$cycles)) { + $r->dequeue(); + } + sleep(1); + threads->yield(); + + is($q->pending(), 0, 'Queue is empty'); + + # Signal no more work is coming + $q->end(); + + is($q->pending(), undef, 'Queue is ended'); + + for my $thread (@threads) { + $thread->join; + pass($thread->tid." joined"); + } +} + +# Test for end() while threads are blocked and items still remain in queue +{ + my @items = 1..($num_threads*$cycles*$count + 1); + my $q = Thread::Queue->new(@items); + my $r = Thread::Queue->new(); + + my @threads; + for my $ii (1..$num_threads) { + push @threads, threads->create( sub { + # Thread will loop until no more work is coming + LOOP: + while (my @set = $q->dequeue($count)) { + foreach my $item (@set) { + last LOOP if (! defined($item)); + pass("'$item' read from queue in thread $ii"); + } + select(undef, undef, undef, rand(1)); + $r->enqueue($ii); + } + pass("Thread $ii exiting"); + }); + } + + # Make sure there's nothing in the queue and threads are blocking + for my $ii (1..($num_threads*$cycles)) { + $r->dequeue(); + } + sleep(1); + threads->yield(); + + is($q->pending(), 1, 'Queue has one left'); + + # Signal no more work is coming + $q->end(); + + for my $thread (@threads) { + $thread->join; + pass($thread->tid." joined"); + } + + is($q->pending(), undef, 'Queue is ended'); +} + +# Test of end() send while items in queue +{ + my @items = 1..($num_threads*$cycles*$count + 1); + my $q = Thread::Queue->new(@items); + + my @threads; + for my $ii (1..$num_threads) { + push @threads, threads->create( sub { + # Thread will loop until no more work is coming + LOOP: + while (my @set = $q->dequeue($count)) { + foreach my $item (@set) { + last LOOP if (! defined($item)); + pass("'$item' read from queue in thread $ii"); + } + select(undef, undef, undef, rand(1)); + } + pass("Thread $ii exiting"); + }); + } + + # Signal no more work is coming to the blocked threads, they + # should unblock. + $q->end(); + + for my $thread (@threads) { + $thread->join; + pass($thread->tid." joined"); + } +} + +exit(0); + +# EOF |