summaryrefslogtreecommitdiff
path: root/dist/Thread-Queue
diff options
context:
space:
mode:
authorJerry D. Hedden <jdhedden@cpan.org>2013-02-19 12:23:46 -0800
committerChris 'BinGOs' Williams <chris@bingosnet.co.uk>2013-02-19 22:21:01 +0000
commitc7bac10acd8724f859ddcb81408cfdbeee046f8f (patch)
tree74539c7e27757872430c3a5da17bdf04340774b1 /dist/Thread-Queue
parent0e18027d899d188a3a156a060d56beb9bfa7ddae (diff)
downloadperl-c7bac10acd8724f859ddcb81408cfdbeee046f8f.tar.gz
[perl #116865] Upgrade to Thread::Queue 3.02
Attached patch updates Thread::Queue to v3.0.2 in preparation for a CPAN release. Adds a new method to Thread::Queue to dequeue items with a timeout feature. This addition was suggested by Andreas Huber. Signed-off-by: Chris 'BinGOs' Williams <chris@bingosnet.co.uk>
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r--dist/Thread-Queue/lib/Thread/Queue.pm75
-rw-r--r--dist/Thread-Queue/t/10_timed.t66
2 files changed, 138 insertions, 3 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm
index 0bf16244b2..027dd56c8a 100644
--- a/dist/Thread-Queue/lib/Thread/Queue.pm
+++ b/dist/Thread-Queue/lib/Thread/Queue.pm
@@ -3,7 +3,7 @@ package Thread::Queue;
use strict;
use warnings;
-our $VERSION = '3.01';
+our $VERSION = '3.02';
$VERSION = eval $VERSION;
use threads::shared 1.21;
@@ -13,7 +13,7 @@ use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
our @CARP_NOT = ("threads::shared");
# Predeclarations for internal functions
-my ($validate_count, $validate_index);
+my ($validate_count, $validate_index, $validate_timeout);
# Create a new queue possibly pre-populated with items
sub new
@@ -103,6 +103,32 @@ sub dequeue_nb
return @items;
}
+# Return items from the head of a queue, blocking if needed up to a timeout
+sub dequeue_timed
+{
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
+
+ # Timeout may be relative or absolute
+ my $timeout = @_ ? $validate_timeout->(shift) : -1;
+ # Convert to an absolute time for use with cond_timedwait()
+ if ($timeout < 32000000) { # More than one year
+ $timeout += time();
+ }
+
+ my $count = @_ ? $validate_count->(shift) : 1;
+
+ # Wait for requisite number of items, or until timeout
+ while ((@$queue < $count) && ! $$self{'ENDED'}) {
+ last if (! cond_timedwait(%$self, $timeout));
+ }
+ cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+ # Get whatever we need off the queue if available
+ return $self->dequeue_nb($count);
+}
+
# Return an item without removing it from a queue
sub peek
{
@@ -232,6 +258,23 @@ $validate_count = sub {
return $count;
};
+# Check value of the requested timeout
+$validate_timeout = sub {
+ my $timeout = shift;
+
+ if (! defined($timeout) ||
+ ! looks_like_number($timeout))
+ {
+ require Carp;
+ my ($method) = (caller(1))[3];
+ $method =~ s/Thread::Queue:://;
+ $timeout = 'undef' if (! defined($timeout));
+ Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
+ }
+
+ return $timeout;
+};
+
1;
=head1 NAME
@@ -240,7 +283,7 @@ Thread::Queue - Thread-safe queues
=head1 VERSION
-This document describes Thread::Queue version 3.01
+This document describes Thread::Queue version 3.02
=head1 SYNOPSIS
@@ -280,6 +323,11 @@ This document describes Thread::Queue version 3.01
# Work on $item
}
+ # Blocking dequeue with 5-second timeout
+ if (defined(my $item = $q->dequeue_timed(5))) {
+ # Work on $item
+ }
+
# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);
@@ -381,6 +429,27 @@ number of items, then it immediately (i.e., non-blocking) returns whatever
items there are on the queue. If the queue is empty, then C<undef> is
returned.
+=item ->dequeue_timed(TIMEOUT)
+
+=item ->dequeue_timed(TIMEOUT, COUNT)
+
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them. If the queue contains fewer than the requested
+number of items, then the thread will be blocked until the requisite number of
+items are available, or until the timeout is reached. If the timeout is
+reached, it returns whatever items there are on the queue, or C<undef> if the
+queue is empty.
+
+The timeout may be a number of seconds relative to the current time (e.g., 5
+seconds from when the call is made), or may be an absolute timeout in I<epoch>
+seconds the same as would be used with
+L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
+Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
+the underlying implementation).
+
+If C<TIMEOUT> is missing, c<undef>, or less than or equal to 0, then this call
+behaves the same as C<dequeue_nb>.
+
=item ->pending()
Returns the number of items still in the queue. Returns C<undef> if the queue
diff --git a/dist/Thread-Queue/t/10_timed.t b/dist/Thread-Queue/t/10_timed.t
new file mode 100644
index 0000000000..8404720ed6
--- /dev/null
+++ b/dist/Thread-Queue/t/10_timed.t
@@ -0,0 +1,66 @@
+use strict;
+use warnings;
+
+BEGIN {
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 19);
+
+### ->dequeue_timed(TIMEOUT, COUNT) test ###
+
+my $q = Thread::Queue->new();
+ok($q, 'New queue');
+
+my @items = qw/foo bar baz qux exit/;
+$q->enqueue(@items);
+is($q->pending(), scalar(@items), 'Queue count');
+
+threads->create(sub {
+ is($q->pending(), scalar(@items), 'Queue count in thread');
+ while (my @el = $q->dequeue_timed(2.5, 2)) {
+ is($el[0], shift(@items), "Thread got $el[0]");
+ if ($el[0] eq 'exit') {
+ is(scalar(@el), 1, 'Thread to exit');
+ } else {
+ is($el[1], shift(@items), "Thread got $el[1]");
+ }
+ }
+ is($q->pending(), 0, 'Empty queue');
+ $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+### ->dequeue_timed(TIMEOUT) test on empty queue ###
+
+threads->create(sub {
+ is($q->pending(), 0, 'Empty queue in thread');
+ my @el = $q->dequeue_timed(1.5);
+ is($el[0], undef, "Thread got no items");
+ is($q->pending(), 0, 'Empty queue in thread');
+ $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+exit(0);
+
+# EOF