summaryrefslogtreecommitdiff
path: root/dist/Thread-Queue
diff options
context:
space:
mode:
authorJames E Keenan <jkeenan@cpan.org>2017-02-14 14:04:54 -0500
committerJames E Keenan <jkeenan@cpan.org>2017-02-14 14:04:54 -0500
commit5376c9c46602f4cb47b306edc76c03bc8500e61f (patch)
treec98407978f8341e40abd445e6c5e711c863973eb /dist/Thread-Queue
parentca81c32aad9c602582b8f87a832594c8712bf6a6 (diff)
downloadperl-5376c9c46602f4cb47b306edc76c03bc8500e61f.tar.gz
Revert "Upgrade to Thread::Queue 3.12"
This reverts commit 57c819f845c985ed9979bfa76b1b8ca1708370f0. Reverting to give us time to explore possible race condition. See: https://rt.perl.org/Ticket/Display.html?id=130777
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r--dist/Thread-Queue/lib/Thread/Queue.pm43
-rw-r--r--dist/Thread-Queue/t/01_basic.t2
-rw-r--r--dist/Thread-Queue/t/02_refs.t2
-rw-r--r--dist/Thread-Queue/t/03_peek.t2
-rw-r--r--dist/Thread-Queue/t/05_extract.t2
-rw-r--r--dist/Thread-Queue/t/06_insert.t2
-rw-r--r--dist/Thread-Queue/t/07_lock.t2
-rw-r--r--dist/Thread-Queue/t/10_timed.t2
-rw-r--r--dist/Thread-Queue/t/11_limit.t48
9 files changed, 25 insertions, 80 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm
index c0d2180653..9f896b72ea 100644
--- a/dist/Thread-Queue/lib/Thread/Queue.pm
+++ b/dist/Thread-Queue/lib/Thread/Queue.pm
@@ -3,7 +3,7 @@ package Thread::Queue;
use strict;
use warnings;
-our $VERSION = '3.12';
+our $VERSION = '3.11';
$VERSION = eval $VERSION;
use threads::shared 1.21;
@@ -65,8 +65,8 @@ sub end
lock(%$self);
# No more data is coming
$$self{'ENDED'} = 1;
-
- cond_signal(%$self); # Unblock possibly waiting threads
+ # Try to release at least one blocked thread
+ cond_signal(%$self);
}
# Return 1 or more items from the head of a queue, blocking if needed
@@ -80,21 +80,17 @@ sub dequeue
# Wait for requisite number of items
cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
+ cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'});
# If no longer blocking, try getting whatever is left on the queue
return $self->dequeue_nb($count) if ($$self{'ENDED'});
# Return single item
- if ($count == 1) {
- my $item = shift(@$queue);
- cond_signal(%$self); # Unblock possibly waiting threads
- return $item;
- }
+ return shift(@$queue) if ($count == 1);
# Return multiple items
my @items;
push(@items, shift(@$queue)) for (1..$count);
- cond_signal(%$self); # Unblock possibly waiting threads
return @items;
}
@@ -108,11 +104,7 @@ sub dequeue_nb
my $count = @_ ? $self->_validate_count(shift) : 1;
# Return single item
- if ($count == 1) {
- my $item = shift(@$queue);
- cond_signal(%$self); # Unblock possibly waiting threads
- return $item;
- }
+ return shift(@$queue) if ($count == 1);
# Return multiple items
my @items;
@@ -120,7 +112,6 @@ sub dequeue_nb
last if (! @$queue);
push(@items, shift(@$queue));
}
- cond_signal(%$self); # Unblock possibly waiting threads
return @items;
}
@@ -144,6 +135,7 @@ sub dequeue_timed
while ((@$queue < $count) && ! $$self{'ENDED'}) {
last if (! cond_timedwait(%$self, $timeout));
}
+ cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'});
# Get whatever we need off the queue if available
return $self->dequeue_nb($count);
@@ -195,7 +187,8 @@ sub insert
# Add previous items back onto the queue
push(@$queue, @tmp);
- cond_signal(%$self); # Unblock possibly waiting threads
+ # Soup's up
+ cond_signal(%$self);
}
# Remove items from anywhere in a queue
@@ -213,7 +206,7 @@ sub extract
$index += @$queue;
if ($index < 0) {
$count += $index;
- return if ($count <= 0); # Beyond the head of the queue
+ return if ($count <= 0); # Beyond the head of the queue
return $self->dequeue_nb($count); # Extract from the head
}
}
@@ -231,8 +224,6 @@ sub extract
# Add back any removed items
push(@$queue, @tmp);
- cond_signal(%$self); # Unblock possibly waiting threads
-
# Return single item
return $items[0] if ($count == 1);
@@ -272,19 +263,14 @@ sub _validate_count
if (! defined($count) ||
! looks_like_number($count) ||
(int($count) != $count) ||
- ($count < 1) ||
- ($$self{'LIMIT'} && $count > $$self{'LIMIT'}))
+ ($count < 1))
{
require Carp;
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$count = 'undef' if (! defined($count));
- if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) {
- Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})");
- } else {
- Carp::croak("Invalid 'count' argument ($count) to '$method' method");
- }
+ Carp::croak("Invalid 'count' argument ($count) to '$method' method");
}
return $count;
@@ -318,7 +304,7 @@ Thread::Queue - Thread-safe queues
=head1 VERSION
-This document describes Thread::Queue version 3.12
+This document describes Thread::Queue version 3.11
=head1 SYNOPSIS
@@ -508,9 +494,6 @@ C<limit> does not prevent enqueuing items beyond that count:
# 'undef')
$q->limit = 0; # Queue size is now unlimited
-Calling any of the dequeue methods with C<COUNT> greater than a queue's
-C<limit> will generate an error.
-
=item ->end()
Declares that no more items will be added to the queue.
diff --git a/dist/Thread-Queue/t/01_basic.t b/dist/Thread-Queue/t/01_basic.t
index 2983f0b700..4ec51957ae 100644
--- a/dist/Thread-Queue/t/01_basic.t
+++ b/dist/Thread-Queue/t/01_basic.t
@@ -13,7 +13,7 @@ use threads;
use Thread::Queue;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/02_refs.t b/dist/Thread-Queue/t/02_refs.t
index 0cebdc1db3..fdf8f6bad2 100644
--- a/dist/Thread-Queue/t/02_refs.t
+++ b/dist/Thread-Queue/t/02_refs.t
@@ -14,7 +14,7 @@ use threads::shared;
use Thread::Queue;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/03_peek.t b/dist/Thread-Queue/t/03_peek.t
index d543b59469..29ef75e7fe 100644
--- a/dist/Thread-Queue/t/03_peek.t
+++ b/dist/Thread-Queue/t/03_peek.t
@@ -13,7 +13,7 @@ use threads;
use Thread::Queue;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/05_extract.t b/dist/Thread-Queue/t/05_extract.t
index de0e78bfd0..d8cb417be9 100644
--- a/dist/Thread-Queue/t/05_extract.t
+++ b/dist/Thread-Queue/t/05_extract.t
@@ -13,7 +13,7 @@ use threads;
use Thread::Queue;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/06_insert.t b/dist/Thread-Queue/t/06_insert.t
index 4f9d1dff5e..93617e13a3 100644
--- a/dist/Thread-Queue/t/06_insert.t
+++ b/dist/Thread-Queue/t/06_insert.t
@@ -13,7 +13,7 @@ use threads;
use Thread::Queue;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/07_lock.t b/dist/Thread-Queue/t/07_lock.t
index b20e0604ca..633722103c 100644
--- a/dist/Thread-Queue/t/07_lock.t
+++ b/dist/Thread-Queue/t/07_lock.t
@@ -14,7 +14,7 @@ use Thread::Queue;
use Thread::Semaphore;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/10_timed.t b/dist/Thread-Queue/t/10_timed.t
index 8404720ed6..da8b03a7eb 100644
--- a/dist/Thread-Queue/t/10_timed.t
+++ b/dist/Thread-Queue/t/10_timed.t
@@ -13,7 +13,7 @@ use threads;
use Thread::Queue;
if ($] == 5.008) {
- require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+ require './t/test.pl'; # Test::More work-alike for Perl 5.8.0
} else {
require Test::More;
}
diff --git a/dist/Thread-Queue/t/11_limit.t b/dist/Thread-Queue/t/11_limit.t
index b84fcc3662..1bd88b39a1 100644
--- a/dist/Thread-Queue/t/11_limit.t
+++ b/dist/Thread-Queue/t/11_limit.t
@@ -19,7 +19,7 @@ use Thread::Queue;
use Test::More;
-plan tests => 13;
+plan tests => 8;
my $q = Thread::Queue->new();
my $rpt = Thread::Queue->new();
@@ -82,12 +82,12 @@ $rpt->enqueue($q->pending);
# q = (4, 5, 'foo'); r = (4, 3, 4, 3)
# Read all items from queue
-my @items = $q->dequeue(3);
-is_deeply(\@items, [4, 5, 'foo'], 'Dequeued 3 items');
+my @item = $q->dequeue(3);
+is_deeply(\@item, [4, 5, 'foo'], 'Dequeued 3 items');
# Thread is now unblocked
-@items = $q->dequeue(2);
-is_deeply(\@items, [6, 7], 'Dequeued 2 items');
+@item = $q->dequeue(2);
+is_deeply(\@item, [6, 7], 'Dequeued 2 items');
# Thread is now unblocked
# Handshake with thread
@@ -96,44 +96,6 @@ $rpt->enqueue('go');
# (7) - Done
$th->join;
-# It's an error to call dequeue methods with COUNT > LIMIT
-eval { $q->dequeue(5); };
-like($@, qr/exceeds queue size limit/, $@);
-
-# Bug #120157
-# Fix deadlock from combination of dequeue_nb, enqueue and queue size limit
-
-# (1) Fill queue
-$q->enqueue(1..3);
-is($q->pending, 3, 'Queue loaded');
-
-$th = threads->create( sub {
- $rpt->enqueue('go');
-
- # (3) Thread blocks trying to add to full queue
- $q->enqueue(99);
-
- # (5) Thread exits
- $rpt->enqueue('done');
-});
-
-# (2) Wait for thread to block on enqueue() call
-is($rpt->dequeue(), 'go', 'Thread blocked');
-threads->yield();
-sleep(1);
-
-# (4) Dequeue items - this will cause thread to unblock
-@items = ();
-while (my $item = $q->dequeue_nb()) {
- push(@items, $item);
- threads->yield();
-}
-is_deeply(\@items, [1,2,3,99], 'Dequeued items');
-
-# (6) - Done
-$th->join();
-is($rpt->dequeue(), 'done', 'Thread exited');
-
exit(0);
# EOF