summaryrefslogtreecommitdiff
path: root/dist/Thread-Queue
diff options
context:
space:
mode:
authorNicholas Clark <nick@ccl4.org>2009-09-28 17:00:50 +0100
committerNicholas Clark <nick@ccl4.org>2009-09-29 11:12:38 +0100
commite4c73f034798b83906105bd2b28ce3805d0fa348 (patch)
treef2674de541e0aa86361691763314e28adf933f29 /dist/Thread-Queue
parent51bde7eebd4cedd091a03b9c38593f8e945fb4a9 (diff)
downloadperl-e4c73f034798b83906105bd2b28ce3805d0fa348.tar.gz
Move Thread::Queue from ext/ to dist/
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r--dist/Thread-Queue/lib/Thread/Queue.pm481
-rw-r--r--dist/Thread-Queue/t/01_basic.t134
-rw-r--r--dist/Thread-Queue/t/02_refs.t189
-rw-r--r--dist/Thread-Queue/t/03_peek.t56
-rw-r--r--dist/Thread-Queue/t/04_errs.t75
-rw-r--r--dist/Thread-Queue/t/05_extract.t78
-rw-r--r--dist/Thread-Queue/t/06_insert.t106
-rw-r--r--dist/Thread-Queue/t/07_lock.t56
-rw-r--r--dist/Thread-Queue/t/08_nothreads.t114
9 files changed, 1289 insertions, 0 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm
new file mode 100644
index 0000000000..631edf126a
--- /dev/null
+++ b/dist/Thread-Queue/lib/Thread/Queue.pm
@@ -0,0 +1,481 @@
+package Thread::Queue;
+
+use strict;
+use warnings;
+
+our $VERSION = '2.11';
+
+use threads::shared 1.21;
+use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
+
+# Carp errors from threads::shared calls should complain about caller
+our @CARP_NOT = ("threads::shared");
+
+# Predeclarations for internal functions
+my ($validate_count, $validate_index);
+
+# Create a new queue possibly pre-populated with items
+sub new
+{
+ my $class = shift;
+ my @queue :shared = map { shared_clone($_) } @_;
+ return bless(\@queue, $class);
+}
+
+# Add items to the tail of a queue
+sub enqueue
+{
+ my $queue = shift;
+ lock(@$queue);
+ push(@$queue, map { shared_clone($_) } @_)
+ and cond_signal(@$queue);
+}
+
+# Return a count of the number of items on a queue
+sub pending
+{
+ my $queue = shift;
+ lock(@$queue);
+ return scalar(@$queue);
+}
+
+# Return 1 or more items from the head of a queue, blocking if needed
+sub dequeue
+{
+ my $queue = shift;
+ lock(@$queue);
+
+ my $count = @_ ? $validate_count->(shift) : 1;
+
+ # Wait for requisite number of items
+ cond_wait(@$queue) until (@$queue >= $count);
+ cond_signal(@$queue) if (@$queue > $count);
+
+ # Return single item
+ return shift(@$queue) if ($count == 1);
+
+ # Return multiple items
+ my @items;
+ push(@items, shift(@$queue)) for (1..$count);
+ return @items;
+}
+
+# Return items from the head of a queue with no blocking
+sub dequeue_nb
+{
+ my $queue = shift;
+ lock(@$queue);
+
+ my $count = @_ ? $validate_count->(shift) : 1;
+
+ # Return single item
+ return shift(@$queue) if ($count == 1);
+
+ # Return multiple items
+ my @items;
+ for (1..$count) {
+ last if (! @$queue);
+ push(@items, shift(@$queue));
+ }
+ return @items;
+}
+
+# Return an item without removing it from a queue
+sub peek
+{
+ my $queue = shift;
+ lock(@$queue);
+ my $index = @_ ? $validate_index->(shift) : 0;
+ return $$queue[$index];
+}
+
+# Insert items anywhere into a queue
+sub insert
+{
+ my $queue = shift;
+ lock(@$queue);
+
+ my $index = $validate_index->(shift);
+
+ return if (! @_); # Nothing to insert
+
+ # Support negative indices
+ if ($index < 0) {
+ $index += @$queue;
+ if ($index < 0) {
+ $index = 0;
+ }
+ }
+
+ # Dequeue items from $index onward
+ my @tmp;
+ while (@$queue > $index) {
+ unshift(@tmp, pop(@$queue))
+ }
+
+ # Add new items to the queue
+ push(@$queue, map { shared_clone($_) } @_);
+
+ # Add previous items back onto the queue
+ push(@$queue, @tmp);
+
+ # Soup's up
+ cond_signal(@$queue);
+}
+
+# Remove items from anywhere in a queue
+sub extract
+{
+ my $queue = shift;
+ lock(@$queue);
+
+ my $index = @_ ? $validate_index->(shift) : 0;
+ my $count = @_ ? $validate_count->(shift) : 1;
+
+ # Support negative indices
+ if ($index < 0) {
+ $index += @$queue;
+ if ($index < 0) {
+ $count += $index;
+ return if ($count <= 0); # Beyond the head of the queue
+ return $queue->dequeue_nb($count); # Extract from the head
+ }
+ }
+
+ # Dequeue items from $index+$count onward
+ my @tmp;
+ while (@$queue > ($index+$count)) {
+ unshift(@tmp, pop(@$queue))
+ }
+
+ # Extract desired items
+ my @items;
+ unshift(@items, pop(@$queue)) while (@$queue > $index);
+
+ # Add back any removed items
+ push(@$queue, @tmp);
+
+ # Return single item
+ return $items[0] if ($count == 1);
+
+ # Return multiple items
+ return @items;
+}
+
+### Internal Functions ###
+
+# Check value of the requested index
+$validate_index = sub {
+ my $index = shift;
+
+ if (! defined($index) ||
+ ! looks_like_number($index) ||
+ (int($index) != $index))
+ {
+ require Carp;
+ my ($method) = (caller(1))[3];
+ $method =~ s/Thread::Queue:://;
+ $index = 'undef' if (! defined($index));
+ Carp::croak("Invalid 'index' argument ($index) to '$method' method");
+ }
+
+ return $index;
+};
+
+# Check value of the requested count
+$validate_count = sub {
+ my $count = shift;
+
+ if (! defined($count) ||
+ ! looks_like_number($count) ||
+ (int($count) != $count) ||
+ ($count < 1))
+ {
+ require Carp;
+ my ($method) = (caller(1))[3];
+ $method =~ s/Thread::Queue:://;
+ $count = 'undef' if (! defined($count));
+ Carp::croak("Invalid 'count' argument ($count) to '$method' method");
+ }
+
+ return $count;
+};
+
+1;
+
+=head1 NAME
+
+Thread::Queue - Thread-safe queues
+
+=head1 VERSION
+
+This document describes Thread::Queue version 2.11
+
+=head1 SYNOPSIS
+
+ use strict;
+ use warnings;
+
+ use threads;
+ use Thread::Queue;
+
+ my $q = Thread::Queue->new(); # A new empty queue
+
+ # Worker thread
+ my $thr = threads->create(sub {
+ while (my $item = $q->dequeue()) {
+ # Do work on $item
+ }
+ })->detach();
+
+ # Send work to the thread
+ $q->enqueue($item1, ...);
+
+
+ # Count of items in the queue
+ my $left = $q->pending();
+
+ # Non-blocking dequeue
+ if (defined(my $item = $q->dequeue_nb())) {
+ # Work on $item
+ }
+
+ # Get the second item in the queue without dequeuing anything
+ my $item = $q->peek(1);
+
+ # Insert two items into the queue just behind the head
+ $q->insert(1, $item1, $item2);
+
+ # Extract the last two items on the queue
+ my ($item1, $item2) = $q->extract(-2, 2);
+
+=head1 DESCRIPTION
+
+This module provides thread-safe FIFO queues that can be accessed safely by
+any number of threads.
+
+Any data types supported by L<threads::shared> can be passed via queues:
+
+=over
+
+=item Ordinary scalars
+
+=item Array refs
+
+=item Hash refs
+
+=item Scalar refs
+
+=item Objects based on the above
+
+=back
+
+Ordinary scalars are added to queues as they are.
+
+If not already thread-shared, the other complex data types will be cloned
+(recursively, if needed, and including any C<bless>ings and read-only
+settings) into thread-shared structures before being placed onto a queue.
+
+For example, the following would cause L<Thread::Queue> to create a empty,
+shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
+and 'baz' from C<@ary> into it, and then place that shared reference onto
+the queue:
+
+ my @ary = qw/foo bar baz/;
+ $q->enqueue(\@ary);
+
+However, for the following, the items are already shared, so their references
+are added directly to the queue, and no cloning takes place:
+
+ my @ary :shared = qw/foo bar baz/;
+ $q->enqueue(\@ary);
+
+ my $obj = &shared({});
+ $$obj{'foo'} = 'bar';
+ $$obj{'qux'} = 99;
+ bless($obj, 'My::Class');
+ $q->enqueue($obj);
+
+See L</"LIMITATIONS"> for caveats related to passing objects via queues.
+
+=head1 QUEUE CREATION
+
+=over
+
+=item ->new()
+
+Creates a new empty queue.
+
+=item ->new(LIST)
+
+Creates a new queue pre-populated with the provided list of items.
+
+=back
+
+=head1 BASIC METHODS
+
+The following methods deal with queues on a FIFO basis.
+
+=over
+
+=item ->enqueue(LIST)
+
+Adds a list of items onto the end of the queue.
+
+=item ->dequeue()
+
+=item ->dequeue(COUNT)
+
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them. If the queue contains fewer than the requested
+number of items, then the thread will be blocked until the requisite number
+of items are available (i.e., until other threads <enqueue> more items).
+
+=item ->dequeue_nb()
+
+=item ->dequeue_nb(COUNT)
+
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them. If the queue contains fewer than the requested
+number of items, then it immediately (i.e., non-blocking) returns whatever
+items there are on the queue. If the queue is empty, then C<undef> is
+returned.
+
+=item ->pending()
+
+Returns the number of items still in the queue.
+
+=back
+
+=head1 ADVANCED METHODS
+
+The following methods can be used to manipulate items anywhere in a queue.
+
+To prevent the contents of a queue from being modified by another thread
+while it is being examined and/or changed, L<lock|threads::shared/"lock
+VARIABLE"> the queue inside a local block:
+
+ {
+ lock($q); # Keep other threads from changing the queue's contents
+ my $item = $q->peek();
+ if ($item ...) {
+ ...
+ }
+ }
+ # Queue is now unlocked
+
+=over
+
+=item ->peek()
+
+=item ->peek(INDEX)
+
+Returns an item from the queue without dequeuing anything. Defaults to the
+the head of queue (at index position 0) if no index is specified. Negative
+index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
+is the end of the queue, -2 is next to last, and so on).
+
+If no items exists at the specified index (i.e., the queue is empty, or the
+index is beyond the number of items on the queue), then C<undef> is returned.
+
+Remember, the returned item is not removed from the queue, so manipulating a
+C<peek>ed at reference affects the item on the queue.
+
+=item ->insert(INDEX, LIST)
+
+Adds the list of items to the queue at the specified index position (0
+is the head of the list). Any existing items at and beyond that position are
+pushed back past the newly added items:
+
+ $q->enqueue(1, 2, 3, 4);
+ $q->insert(1, qw/foo bar/);
+ # Queue now contains: 1, foo, bar, 2, 3, 4
+
+Specifying an index position greater than the number of items in the queue
+just adds the list to the end.
+
+Negative index positions are supported:
+
+ $q->enqueue(1, 2, 3, 4);
+ $q->insert(-2, qw/foo bar/);
+ # Queue now contains: 1, 2, foo, bar, 3, 4
+
+Specifying a negative index position greater than the number of items in the
+queue adds the list to the head of the queue.
+
+=item ->extract()
+
+=item ->extract(INDEX)
+
+=item ->extract(INDEX, COUNT)
+
+Removes and returns the specified number of items (defaults to 1) from the
+specified index position in the queue (0 is the head of the queue). When
+called with no arguments, C<extract> operates the same as C<dequeue_nb>.
+
+This method is non-blocking, and will return only as many items as are
+available to fulfill the request:
+
+ $q->enqueue(1, 2, 3, 4);
+ my $item = $q->extract(2) # Returns 3
+ # Queue now contains: 1, 2, 4
+ my @items = $q->extract(1, 3) # Returns (2, 4)
+ # Queue now contains: 1
+
+Specifying an index position greater than the number of items in the
+queue results in C<undef> or an empty list being returned.
+
+ $q->enqueue('foo');
+ my $nada = $q->extract(3) # Returns undef
+ my @nada = $q->extract(1, 3) # Returns ()
+
+Negative index positions are supported. Specifying a negative index position
+greater than the number of items in the queue may return items from the head
+of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
+queue from the specified position (i.e. if queue size + index + count is
+greater than zero):
+
+ $q->enqueue(qw/foo bar baz/);
+ my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
+ my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
+ # Queue now contains: bar, baz
+ my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
+
+=back
+
+=head1 NOTES
+
+Queues created by L<Thread::Queue> can be used in both threaded and
+non-threaded applications.
+
+=head1 LIMITATIONS
+
+Passing objects on queues may not work if the objects' classes do not support
+sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
+
+Passing array/hash refs that contain objects may not work for Perl prior to
+5.10.0.
+
+=head1 SEE ALSO
+
+Thread::Queue Discussion Forum on CPAN:
+L<http://www.cpanforum.com/dist/Thread-Queue>
+
+Annotated POD for Thread::Queue:
+L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm>
+
+Source repository:
+L<http://code.google.com/p/thread-queue/>
+
+L<threads>, L<threads::shared>
+
+=head1 MAINTAINER
+
+Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
+
+=head1 LICENSE
+
+This program is free software; you can redistribute it and/or modify it under
+the same terms as Perl itself.
+
+=cut
diff --git a/dist/Thread-Queue/t/01_basic.t b/dist/Thread-Queue/t/01_basic.t
new file mode 100644
index 0000000000..6a0d8387e8
--- /dev/null
+++ b/dist/Thread-Queue/t/01_basic.t
@@ -0,0 +1,134 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 81);
+
+### Basic usage with multiple threads ###
+
+my $nthreads = 5;
+
+my $q = Thread::Queue->new(1..$nthreads);
+ok($q, 'New queue');
+is($q->pending(), $nthreads, 'Pre-populated queue count');
+
+sub reader {
+ my $id = threads->tid();
+ while ((my $el = $q->dequeue()) != -1) {
+ ok($el >= 1, "Thread $id got $el");
+ select(undef, undef, undef, rand(1));
+ }
+ ok(1, "Thread $id done");
+}
+
+my @threads;
+push(@threads, threads->create('reader')) for (1..$nthreads);
+
+for (1..20) {
+ select(undef, undef, undef, rand(1));
+ $q->enqueue($_);
+}
+
+$q->enqueue((-1) x $nthreads); # One end marker for each thread
+
+$_->join() foreach @threads;
+undef(@threads);
+
+is($q->pending(), 0, 'Empty queue');
+
+
+### ->dequeue_nb() test ###
+
+$q = Thread::Queue->new();
+ok($q, 'New queue');
+is($q->pending(), 0, 'Empty queue');
+
+my @items = qw/foo bar baz/;
+$q->enqueue(@items);
+
+threads->create(sub {
+ is($q->pending(), scalar(@items), 'Queue count in thread');
+ while (my $el = $q->dequeue_nb()) {
+ is($el, shift(@items), "Thread got $el");
+ }
+ is($q->pending(), 0, 'Empty queue');
+ $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+
+### ->dequeue(COUNT) test ###
+
+my $count = 3;
+
+sub reader2 {
+ my $id = threads->tid();
+ while (1) {
+ my @el = $q->dequeue($count);
+ is(scalar(@el), $count, "Thread $id got @el");
+ select(undef, undef, undef, rand(1));
+ return if ($el[0] == 0);
+ }
+}
+
+push(@threads, threads->create('reader2')) for (1..$nthreads);
+
+$q->enqueue(1..4*$count*$nthreads);
+$q->enqueue((0) x ($count*$nthreads));
+
+$_->join() foreach @threads;
+undef(@threads);
+
+is($q->pending(), 0, 'Empty queue');
+
+
+### ->dequeue_nb(COUNT) test ###
+
+@items = qw/foo bar baz qux exit/;
+$q->enqueue(@items);
+is($q->pending(), scalar(@items), 'Queue count');
+
+threads->create(sub {
+ is($q->pending(), scalar(@items), 'Queue count in thread');
+ while (my @el = $q->dequeue_nb(2)) {
+ is($el[0], shift(@items), "Thread got $el[0]");
+ if ($el[0] eq 'exit') {
+ is(scalar(@el), 1, 'Thread to exit');
+ } else {
+ is($el[1], shift(@items), "Thread got $el[1]");
+ }
+ }
+ is($q->pending(), 0, 'Empty queue');
+ $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/02_refs.t b/dist/Thread-Queue/t/02_refs.t
new file mode 100644
index 0000000000..3a59b5e3b6
--- /dev/null
+++ b/dist/Thread-Queue/t/02_refs.t
@@ -0,0 +1,189 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use threads::shared;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 46);
+
+# Regular array
+my @ary1 = qw/foo bar baz/;
+push(@ary1, [ 1..3 ], { 'qux' => 99 });
+
+# Shared array
+my @ary2 :shared = (99, 21, 86);
+
+# Regular hash-based object
+my $obj1 = {
+ 'foo' => 'bar',
+ 'qux' => 99,
+ 'biff' => [ qw/fee fi fo/ ],
+ 'boff' => { 'bork' => 'true' },
+};
+bless($obj1, 'Foo');
+
+# Shared hash-based object
+my $obj2 = &share({});
+$$obj2{'bar'} = 86;
+$$obj2{'key'} = 'foo';
+bless($obj2, 'Bar');
+
+# Scalar ref
+my $sref1 = \do{ my $scalar = 'foo'; };
+
+# Shared scalar ref object
+my $sref2 = \do{ my $scalar = 69; };
+share($sref2);
+bless($sref2, 'Baz');
+
+# Ref of ref
+my $foo = [ 5, 'bork', { 'now' => 123 } ];
+my $bar = \$foo;
+my $baz = \$bar;
+my $qux = \$baz;
+is_deeply($$$$qux, $foo, 'Ref of ref');
+
+# Circular refs
+my $cir1;
+$cir1 = \$cir1;
+
+my $cir1s : shared;
+$cir1s = \$cir1s;
+
+my $cir2;
+$cir2 = [ \$cir2, { 'ref' => \$cir2 } ];
+
+my $cir3 :shared = &share({});
+$cir3->{'self'} = \$cir3;
+bless($cir3, 'Circular');
+
+# Queue up items
+my $q = Thread::Queue->new(\@ary1, \@ary2);
+ok($q, 'New queue');
+is($q->pending(), 2, 'Queue count');
+$q->enqueue($obj1, $obj2);
+is($q->pending(), 4, 'Queue count');
+$q->enqueue($sref1, $sref2, $foo, $qux);
+is($q->pending(), 8, 'Queue count');
+$q->enqueue($cir1, $cir1s, $cir2, $cir3);
+is($q->pending(), 12, 'Queue count');
+
+# Process items in thread
+threads->create(sub {
+ is($q->pending(), 12, 'Queue count in thread');
+
+ my $tary1 = $q->dequeue();
+ ok($tary1, 'Thread got item');
+ is(ref($tary1), 'ARRAY', 'Item is array ref');
+ is_deeply($tary1, \@ary1, 'Complex array');
+ $$tary1[1] = 123;
+
+ my $tary2 = $q->dequeue();
+ ok($tary2, 'Thread got item');
+ is(ref($tary2), 'ARRAY', 'Item is array ref');
+ for (my $ii=0; $ii < @ary2; $ii++) {
+ is($$tary2[$ii], $ary2[$ii], 'Shared array element check');
+ }
+ $$tary2[1] = 444;
+
+ my $tobj1 = $q->dequeue();
+ ok($tobj1, 'Thread got item');
+ is(ref($tobj1), 'Foo', 'Item is object');
+ is_deeply($tobj1, $obj1, 'Object comparison');
+ $$tobj1{'foo'} = '.|.';
+ $$tobj1{'smiley'} = ':)';
+
+ my $tobj2 = $q->dequeue();
+ ok($tobj2, 'Thread got item');
+ is(ref($tobj2), 'Bar', 'Item is object');
+ is($$tobj2{'bar'}, 86, 'Shared object element check');
+ is($$tobj2{'key'}, 'foo', 'Shared object element check');
+ $$tobj2{'tick'} = 'tock';
+ $$tobj2{'frowny'} = ':(';
+
+ my $tsref1 = $q->dequeue();
+ ok($tsref1, 'Thread got item');
+ is(ref($tsref1), 'SCALAR', 'Item is scalar ref');
+ is($$tsref1, 'foo', 'Scalar ref contents');
+ $$tsref1 = 0;
+
+ my $tsref2 = $q->dequeue();
+ ok($tsref2, 'Thread got item');
+ is(ref($tsref2), 'Baz', 'Item is object');
+ is($$tsref2, 69, 'Shared scalar ref contents');
+ $$tsref2 = 'zzz';
+
+ my $myfoo = $q->dequeue();
+ is_deeply($myfoo, $foo, 'Array ref');
+
+ my $qux = $q->dequeue();
+ is_deeply($$$$qux, $foo, 'Ref of ref');
+
+ my ($c1, $c1s, $c2, $c3) = $q->dequeue(4);
+ SKIP: {
+ skip("Needs threads::shared >= 1.19", 5)
+ if ($threads::shared::VERSION < 1.19);
+
+ is(threads::shared::_id($$c1),
+ threads::shared::_id($c1),
+ 'Circular ref - scalar');
+
+ is(threads::shared::_id($$c1s),
+ threads::shared::_id($c1s),
+ 'Circular ref - shared scalar');
+
+ is(threads::shared::_id(${$c2->[0]}),
+ threads::shared::_id($c2),
+ 'Circular ref - array');
+
+ is(threads::shared::_id(${$c2->[1]->{'ref'}}),
+ threads::shared::_id($c2),
+ 'Circular ref - mixed');
+
+ is(threads::shared::_id(${$c3->{'self'}}),
+ threads::shared::_id($c3),
+ 'Circular ref - hash');
+ }
+
+ is($q->pending(), 0, 'Empty queue');
+ my $nothing = $q->dequeue_nb();
+ ok(! defined($nothing), 'Nothing on queue');
+})->join();
+
+# Check results of thread's activities
+is($q->pending(), 0, 'Empty queue');
+
+is($ary1[1], 'bar', 'Array unchanged');
+is($ary2[1], 444, 'Shared array changed');
+
+is($$obj1{'foo'}, 'bar', 'Object unchanged');
+ok(! exists($$obj1{'smiley'}), 'Object unchanged');
+
+is($$obj2{'tick'}, 'tock', 'Shared object changed');
+is($$obj2{'frowny'}, ':(', 'Shared object changed');
+
+is($$sref1, 'foo', 'Scalar ref unchanged');
+is($$sref2, 'zzz', 'Shared scalar ref changed');
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/03_peek.t b/dist/Thread-Queue/t/03_peek.t
new file mode 100644
index 0000000000..1844c06dea
--- /dev/null
+++ b/dist/Thread-Queue/t/03_peek.t
@@ -0,0 +1,56 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 19);
+
+my $q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+$q->enqueue([ qw/foo bar/ ]);
+
+sub q_check
+{
+ is($q->peek(3), 4, 'Peek at queue');
+ is($q->peek(-3), 9, 'Negative peek');
+
+ my $nada = $q->peek(20);
+ ok(! defined($nada), 'Big peek');
+ $nada = $q->peek(-20);
+ ok(! defined($nada), 'Big negative peek');
+
+ my $ary = $q->peek(-1);
+ is_deeply($ary, [ qw/foo bar/ ], 'Peek array');
+
+ is($q->pending(), 11, 'Queue count in thread');
+}
+
+threads->create(sub {
+ q_check();
+ threads->create('q_check')->join();
+})->join();
+q_check();
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/04_errs.t b/dist/Thread-Queue/t/04_errs.t
new file mode 100644
index 0000000000..3479c83c9e
--- /dev/null
+++ b/dist/Thread-Queue/t/04_errs.t
@@ -0,0 +1,75 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+}
+
+use Thread::Queue;
+
+use Test::More 'tests' => 26;
+
+my $q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+eval { $q->dequeue(undef); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue(0); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue(0.5); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue(-1); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue('foo'); };
+like($@, qr/Invalid 'count'/, $@);
+
+eval { $q->dequeue_nb(undef); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue_nb(0); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue_nb(-0.5); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue_nb(-1); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->dequeue_nb('foo'); };
+like($@, qr/Invalid 'count'/, $@);
+
+eval { $q->peek(undef); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->peek(3.3); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->peek('foo'); };
+like($@, qr/Invalid 'index'/, $@);
+
+eval { $q->insert(); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->insert(undef); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->insert(.22); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->insert('foo'); };
+like($@, qr/Invalid 'index'/, $@);
+
+eval { $q->extract(undef); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->extract('foo'); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->extract(1.1); };
+like($@, qr/Invalid 'index'/, $@);
+eval { $q->extract(0, undef); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->extract(0, 0); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->extract(0, 3.3); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->extract(0, -1); };
+like($@, qr/Invalid 'count'/, $@);
+eval { $q->extract(0, 'foo'); };
+like($@, qr/Invalid 'count'/, $@);
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/05_extract.t b/dist/Thread-Queue/t/05_extract.t
new file mode 100644
index 0000000000..2773340932
--- /dev/null
+++ b/dist/Thread-Queue/t/05_extract.t
@@ -0,0 +1,78 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 20);
+
+my $q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ # Default count = 1
+ is($q->extract(), 1, 'No args'); # 2..10 left
+ is($q->extract(0), 2, 'Head'); # 3..10 left
+ is($q->extract(5), 8, 'Pos index'); # 3..7,9,10 left
+ is($q->extract(-3), 7, 'Neg index'); # 3..6,9,10 left
+ my $x = $q->extract(20); # unchanged
+ ok(! defined($x), 'Big index');
+ $x = $q->extract(-20); # unchanged
+ ok(! defined($x), 'Big neg index');
+})->join();
+
+$q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ my @x = $q->extract(0, 2); # 3..10 left
+ is_deeply(\@x, [1,2], '2 from head');
+ @x = $q->extract(6, 2); # 3..8 left
+ is_deeply(\@x, [9,10], '2 from tail');
+ @x = $q->extract(2, 2); # 3,4,7,8 left
+ is_deeply(\@x, [5,6], '2 from middle');
+ @x = $q->extract(2, 4); # 3,4 left
+ is_deeply(\@x, [7,8], 'Lots from tail');
+ @x = $q->extract(3, 4); # unchanged
+ is_deeply(\@x, [], 'Too far');
+})->join();
+
+$q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ my @x = $q->extract(-4, 2); # 1..6,9,10 left
+ is_deeply(\@x, [7,8], 'Neg index');
+ @x = $q->extract(-2, 4); # 1..6 left
+ is_deeply(\@x, [9,10], 'Lots from tail');
+ @x = $q->extract(-6, 2); # 3..6 left
+ is_deeply(\@x, [1,2], 'Max neg index');
+ @x = $q->extract(-10, 3); # unchanged
+ is_deeply(\@x, [], 'Too far');
+ @x = $q->extract(-6, 3); # 4..6 left
+ is_deeply(\@x, [3], 'Neg overlap');
+ @x = $q->extract(-5, 10); # empty
+ is_deeply(\@x, [4..6], 'Neg big overlap');
+})->join();
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/06_insert.t b/dist/Thread-Queue/t/06_insert.t
new file mode 100644
index 0000000000..4f9c2b47ec
--- /dev/null
+++ b/dist/Thread-Queue/t/06_insert.t
@@ -0,0 +1,106 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 16);
+
+my $q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ $q->insert(5);
+ $q->insert(-5);
+ $q->insert(100);
+ $q->insert(-100);
+})->join();
+
+my @x = $q->dequeue_nb(100);
+is_deeply(\@x, [1..10], 'No-op inserts');
+
+
+$q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ $q->insert(10, qw/tail/);
+ $q->insert(0, qw/head/);
+})->join();
+
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, ['head',1..10,'tail'], 'Edge inserts');
+
+
+$q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ $q->insert(5, qw/foo bar/);
+ $q->insert(-2, qw/qux/);
+})->join();
+
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, [1..5,'foo','bar',6..8,'qux',9,10], 'Middle inserts');
+
+
+$q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+threads->create(sub {
+ $q->insert(20, qw/tail/);
+ $q->insert(-20, qw/head/);
+})->join();
+
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, ['head',1..10,'tail'], 'Extreme inserts');
+
+
+$q = Thread::Queue->new();
+ok($q, 'New queue');
+threads->create(sub { $q->insert(0, 1..3); })->join();
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, [1..3], 'Empty queue insert');
+
+$q = Thread::Queue->new();
+ok($q, 'New queue');
+threads->create(sub { $q->insert(20, 1..3); })->join();
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, [1..3], 'Empty queue insert');
+
+$q = Thread::Queue->new();
+ok($q, 'New queue');
+threads->create(sub { $q->insert(-1, 1..3); })->join();
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, [1..3], 'Empty queue insert');
+
+$q = Thread::Queue->new();
+ok($q, 'New queue');
+threads->create(sub {
+ $q->insert(2, 1..3);
+ $q->insert(1, 'foo');
+})->join();
+@x = $q->dequeue_nb(100);
+is_deeply(\@x, [1,'foo',2,3], 'Empty queue insert');
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/07_lock.t b/dist/Thread-Queue/t/07_lock.t
new file mode 100644
index 0000000000..625159e97c
--- /dev/null
+++ b/dist/Thread-Queue/t/07_lock.t
@@ -0,0 +1,56 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+use Thread::Semaphore;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 3);
+
+# The following tests locking a queue
+
+my $q = Thread::Queue->new(1..10);
+ok($q, 'New queue');
+
+my $sm = Thread::Semaphore->new(0);
+my $st = Thread::Semaphore->new(0);
+
+threads->create(sub {
+ {
+ lock($q);
+ $sm->up();
+ $st->down();
+ threads::yield();
+ select(undef, undef, undef, 0.1);
+ my @x = $q->extract(5,2);
+ is_deeply(\@x, [6,7], 'Thread dequeues under lock');
+ }
+})->detach();
+
+$sm->down();
+$st->up();
+my @x = $q->dequeue_nb(100);
+is_deeply(\@x, [1..5,8..10], 'Main dequeues');
+threads::yield();
+
+exit(0);
+
+# EOF
diff --git a/dist/Thread-Queue/t/08_nothreads.t b/dist/Thread-Queue/t/08_nothreads.t
new file mode 100644
index 0000000000..7ac43481b3
--- /dev/null
+++ b/dist/Thread-Queue/t/08_nothreads.t
@@ -0,0 +1,114 @@
+use strict;
+use warnings;
+
+BEGIN {
+ if ($ENV{'PERL_CORE'}){
+ chdir('t');
+ unshift(@INC, '../lib');
+ }
+}
+
+use Test::More 'tests' => 32;
+
+use Thread::Queue;
+
+# Regular array
+my @ary1 = qw/foo bar baz/;
+push(@ary1, [ 1..3 ], { 'qux' => 99 });
+
+# Shared array
+my @ary2 :shared = (99, 21, 86);
+
+# Regular hash-based object
+my $obj1 = {
+ 'foo' => 'bar',
+ 'qux' => 99,
+ 'biff' => [ qw/fee fi fo/ ],
+ 'boff' => { 'bork' => 'true' },
+};
+bless($obj1, 'Foo');
+
+# Shared hash-based object
+my $obj2 = &threads::shared::share({});
+$$obj2{'bar'} = 86;
+$$obj2{'key'} = 'foo';
+bless($obj2, 'Bar');
+
+# Scalar ref
+my $sref1 = \do{ my $scalar = 'foo'; };
+
+# Shared scalar ref object
+my $sref2 = \do{ my $scalar = 69; };
+threads::shared::share($sref2);
+bless($sref2, 'Baz');
+
+# Ref of ref
+my $foo = [ 5, 'bork', { 'now' => 123 } ];
+my $bar = \$foo;
+my $baz = \$bar;
+my $qux = \$baz;
+is_deeply($$$$qux, $foo, 'Ref of ref');
+
+# Queue up items
+my $q = Thread::Queue->new(\@ary1, \@ary2);
+ok($q, 'New queue');
+is($q->pending(), 2, 'Queue count');
+$q->enqueue($obj1, $obj2);
+is($q->pending(), 4, 'Queue count');
+$q->enqueue($sref1, $sref2, $qux);
+is($q->pending(), 7, 'Queue count');
+
+# Process items in queue
+{
+ is($q->pending(), 7, 'Queue count in thread');
+
+ my $ref = $q->peek(3);
+ is(ref($ref), 'Bar', 'Item is object');
+
+ my $tary1 = $q->dequeue();
+ ok($tary1, 'Thread got item');
+ is(ref($tary1), 'ARRAY', 'Item is array ref');
+ is_deeply($tary1, \@ary1, 'Complex array');
+
+ my $tary2 = $q->dequeue();
+ ok($tary2, 'Thread got item');
+ is(ref($tary2), 'ARRAY', 'Item is array ref');
+ for (my $ii=0; $ii < @ary2; $ii++) {
+ is($$tary2[$ii], $ary2[$ii], 'Shared array element check');
+ }
+
+ my $tobj1 = $q->dequeue();
+ ok($tobj1, 'Thread got item');
+ is(ref($tobj1), 'Foo', 'Item is object');
+ is_deeply($tobj1, $obj1, 'Object comparison');
+
+ my $tobj2 = $q->dequeue();
+ ok($tobj2, 'Thread got item');
+ is(ref($tobj2), 'Bar', 'Item is object');
+ is($$tobj2{'bar'}, 86, 'Shared object element check');
+ is($$tobj2{'key'}, 'foo', 'Shared object element check');
+
+ my $tsref1 = $q->dequeue();
+ ok($tsref1, 'Thread got item');
+ is(ref($tsref1), 'SCALAR', 'Item is scalar ref');
+ is($$tsref1, 'foo', 'Scalar ref contents');
+
+ my $tsref2 = $q->dequeue();
+ ok($tsref2, 'Thread got item');
+ is(ref($tsref2), 'Baz', 'Item is object');
+ is($$tsref2, 69, 'Shared scalar ref contents');
+
+ my $qux = $q->dequeue();
+ is_deeply($$$$qux, $foo, 'Ref of ref');
+
+ is($q->pending(), 0, 'Empty queue');
+ my $nothing = $q->dequeue_nb();
+ ok(! defined($nothing), 'Nothing on queue');
+}
+
+# Check results of thread's activities
+is($q->pending(), 0, 'Empty queue');
+
+exit(0);
+
+# EOF