summaryrefslogtreecommitdiff
path: root/dist/Thread-Queue
diff options
context:
space:
mode:
authorJerry D. Hedden <jdhedden@cpan.org>2012-10-23 22:48:50 -0400
committerFather Chrysostomos <sprout@cpan.org>2012-11-06 12:33:38 -0800
commit1fd4700eab6b2624e106e061f227763e0893f452 (patch)
treea5b8a59a01a0d874a05ecd0617992a4cabec6a63 /dist/Thread-Queue
parent9eb48717912f4fd262be607b74cc3c89014fc58b (diff)
downloadperl-1fd4700eab6b2624e106e061f227763e0893f452.tar.gz
Upgrade to Thread::Queue 3.01
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r--dist/Thread-Queue/lib/Thread/Queue.pm114
-rw-r--r--dist/Thread-Queue/t/09_ended.t146
2 files changed, 229 insertions, 31 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm
index 8588ed5845..0bf16244b2 100644
--- a/dist/Thread-Queue/lib/Thread/Queue.pm
+++ b/dist/Thread-Queue/lib/Thread/Queue.pm
@@ -3,7 +3,7 @@ package Thread::Queue;
use strict;
use warnings;
-our $VERSION = '2.12';
+our $VERSION = '3.01';
$VERSION = eval $VERSION;
use threads::shared 1.21;
@@ -20,37 +20,58 @@ sub new
{
my $class = shift;
my @queue :shared = map { shared_clone($_) } @_;
- return bless(\@queue, $class);
+ my %self :shared = ( 'queue' => \@queue );
+ return bless(\%self, $class);
}
# Add items to the tail of a queue
sub enqueue
{
- my $queue = shift;
- lock(@$queue);
- push(@$queue, map { shared_clone($_) } @_)
- and cond_signal(@$queue);
+ my $self = shift;
+ lock(%$self);
+ if ($$self{'ENDED'}) {
+ require Carp;
+ Carp::croak("'enqueue' method called on queue that has been 'end'ed");
+ }
+ push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+ and cond_signal(%$self);
}
# Return a count of the number of items on a queue
sub pending
{
- my $queue = shift;
- lock(@$queue);
- return scalar(@$queue);
+ my $self = shift;
+ lock(%$self);
+ return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
+ return scalar(@{$$self{'queue'}});
+}
+
+# Indicate that no more data will enter the queue
+sub end
+{
+ my $self = shift;
+ lock $self;
+ # No more data is coming
+ $$self{'ENDED'} = 1;
+ # Try to release at least one blocked thread
+ cond_signal(%$self);
}
# Return 1 or more items from the head of a queue, blocking if needed
sub dequeue
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
my $count = @_ ? $validate_count->(shift) : 1;
# Wait for requisite number of items
- cond_wait(@$queue) until (@$queue >= $count);
- cond_signal(@$queue) if (@$queue > $count);
+ cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
+ cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+ # If no longer blocking, try getting whatever is left on the queue
+ return $self->dequeue_nb($count) if ($$self{'ENDED'});
# Return single item
return shift(@$queue) if ($count == 1);
@@ -64,8 +85,9 @@ sub dequeue
# Return items from the head of a queue with no blocking
sub dequeue_nb
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
my $count = @_ ? $validate_count->(shift) : 1;
@@ -84,17 +106,24 @@ sub dequeue_nb
# Return an item without removing it from a queue
sub peek
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
my $index = @_ ? $validate_index->(shift) : 0;
- return $$queue[$index];
+ return $$self{'queue'}[$index];
}
# Insert items anywhere into a queue
sub insert
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+
+ if ($$self{'ENDED'}) {
+ require Carp;
+ Carp::croak("'insert' method called on queue that has been 'end'ed");
+ }
+
+ my $queue = $$self{'queue'};
my $index = $validate_index->(shift);
@@ -121,14 +150,15 @@ sub insert
push(@$queue, @tmp);
# Soup's up
- cond_signal(@$queue);
+ cond_signal(%$self);
}
# Remove items from anywhere in a queue
sub extract
{
- my $queue = shift;
- lock(@$queue);
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
my $index = @_ ? $validate_index->(shift) : 0;
my $count = @_ ? $validate_count->(shift) : 1;
@@ -139,7 +169,7 @@ sub extract
if ($index < 0) {
$count += $index;
return if ($count <= 0); # Beyond the head of the queue
- return $queue->dequeue_nb($count); # Extract from the head
+ return $self->dequeue_nb($count); # Extract from the head
}
}
@@ -210,7 +240,7 @@ Thread::Queue - Thread-safe queues
=head1 VERSION
-This document describes Thread::Queue version 2.12
+This document describes Thread::Queue version 3.01
=head1 SYNOPSIS
@@ -223,15 +253,24 @@ This document describes Thread::Queue version 2.12
my $q = Thread::Queue->new(); # A new empty queue
# Worker thread
- my $thr = threads->create(sub {
- while (my $item = $q->dequeue()) {
- # Do work on $item
- }
- })->detach();
+ my $thr = threads->create(
+ sub {
+ # Thread will loop until no more work
+ while (defined(my $item = $q->dequeue())) {
+ # Do work on $item
+ ...
+ }
+ }
+ );
# Send work to the thread
$q->enqueue($item1, ...);
+ # Signal that there is no more work to be sent
+ $q->end();
+ # Join up with the thread when it finishes
+ $thr->join();
+ ...
# Count of items in the queue
my $left = $q->pending();
@@ -344,7 +383,18 @@ returned.
=item ->pending()
-Returns the number of items still in the queue.
+Returns the number of items still in the queue. Returns C<undef> if the queue
+has been ended (see below), and there are no more items in the queue.
+
+=item ->end()
+
+Declares that no more items will be added to the queue.
+
+All threads blocking on C<dequeue()> calls will be unblocked with any
+remaining items in the queue and/or C<undef> being returned. Any subsequent
+calls to C<dequeue()> will behave like C<dequeue_nb()>.
+
+Once ended, no more items may be placed in the queue.
=back
@@ -464,6 +514,8 @@ L<http://www.cpanforum.com/dist/Thread-Queue>
L<threads>, L<threads::shared>
+Sample code in the I<examples> directory of this distribution on CPAN.
+
=head1 MAINTAINER
Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
diff --git a/dist/Thread-Queue/t/09_ended.t b/dist/Thread-Queue/t/09_ended.t
new file mode 100644
index 0000000000..a0a9292f08
--- /dev/null
+++ b/dist/Thread-Queue/t/09_ended.t
@@ -0,0 +1,146 @@
+use strict;
+use warnings;
+
+use Config;
+
+BEGIN {
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+ if (! $Config{'d_select'}) {
+ print("1..0 # SKIP 'select()' not available for testing\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+use Test::More;
+
+my $num_threads = 3;
+my $cycles = 2;
+my $count = 2;
+plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;
+
+# Test for end() while threads are blocked and no more items in queue
+{
+ my @items = 1..($num_threads*$cycles*$count);
+ my $q = Thread::Queue->new(@items);
+ my $r = Thread::Queue->new();
+
+ my @threads;
+ for my $ii (1..$num_threads) {
+ push @threads, threads->create( sub {
+ # Thread will loop until no more work is coming
+ LOOP:
+ while (my @set = $q->dequeue($count)) {
+ foreach my $item (@set) {
+ last LOOP if (! defined($item));
+ pass("'$item' read from queue in thread $ii");
+ }
+ select(undef, undef, undef, rand(1));
+ $r->enqueue($ii);
+ }
+ pass("Thread $ii exiting");
+ });
+ }
+
+ # Make sure there's nothing in the queue and threads are blocking
+ for my $ii (1..($num_threads*$cycles)) {
+ $r->dequeue();
+ }
+ sleep(1);
+ threads->yield();
+
+ is($q->pending(), 0, 'Queue is empty');
+
+ # Signal no more work is coming
+ $q->end();
+
+ is($q->pending(), undef, 'Queue is ended');
+
+ for my $thread (@threads) {
+ $thread->join;
+ pass($thread->tid." joined");
+ }
+}
+
+# Test for end() while threads are blocked and items still remain in queue
+{
+ my @items = 1..($num_threads*$cycles*$count + 1);
+ my $q = Thread::Queue->new(@items);
+ my $r = Thread::Queue->new();
+
+ my @threads;
+ for my $ii (1..$num_threads) {
+ push @threads, threads->create( sub {
+ # Thread will loop until no more work is coming
+ LOOP:
+ while (my @set = $q->dequeue($count)) {
+ foreach my $item (@set) {
+ last LOOP if (! defined($item));
+ pass("'$item' read from queue in thread $ii");
+ }
+ select(undef, undef, undef, rand(1));
+ $r->enqueue($ii);
+ }
+ pass("Thread $ii exiting");
+ });
+ }
+
+ # Make sure there's nothing in the queue and threads are blocking
+ for my $ii (1..($num_threads*$cycles)) {
+ $r->dequeue();
+ }
+ sleep(1);
+ threads->yield();
+
+ is($q->pending(), 1, 'Queue has one left');
+
+ # Signal no more work is coming
+ $q->end();
+
+ for my $thread (@threads) {
+ $thread->join;
+ pass($thread->tid." joined");
+ }
+
+ is($q->pending(), undef, 'Queue is ended');
+}
+
+# Test of end() send while items in queue
+{
+ my @items = 1..($num_threads*$cycles*$count + 1);
+ my $q = Thread::Queue->new(@items);
+
+ my @threads;
+ for my $ii (1..$num_threads) {
+ push @threads, threads->create( sub {
+ # Thread will loop until no more work is coming
+ LOOP:
+ while (my @set = $q->dequeue($count)) {
+ foreach my $item (@set) {
+ last LOOP if (! defined($item));
+ pass("'$item' read from queue in thread $ii");
+ }
+ select(undef, undef, undef, rand(1));
+ }
+ pass("Thread $ii exiting");
+ });
+ }
+
+ # Signal no more work is coming to the blocked threads, they
+ # should unblock.
+ $q->end();
+
+ for my $thread (@threads) {
+ $thread->join;
+ pass($thread->tid." joined");
+ }
+}
+
+exit(0);
+
+# EOF