summaryrefslogtreecommitdiff
path: root/dist/Thread-Queue
diff options
context:
space:
mode:
authorJerry D. Hedden <jdhedden@cpan.org>2015-08-22 16:40:50 -0400
committerJames E Keenan <jkeenan@cpan.org>2015-08-22 19:21:35 -0400
commite128eaa17ab039e9db53073c7ac6c5093b3628d9 (patch)
treeb3641adc9abf6f284902109bc52891d87693adbb /dist/Thread-Queue
parent4442630f9d575dd61a1ec4fc6b3dccd6756e9697 (diff)
downloadperl-e128eaa17ab039e9db53073c7ac6c5093b3628d9.tar.gz
Upgrade to Thread::Queue 3.06
For: RT #125864
Diffstat (limited to 'dist/Thread-Queue')
-rw-r--r--dist/Thread-Queue/lib/Thread/Queue.pm42
-rw-r--r--dist/Thread-Queue/t/07_lock.t6
-rw-r--r--dist/Thread-Queue/t/11_limit.t101
3 files changed, 140 insertions, 9 deletions
diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm
index 316644a64f..ebc1c31749 100644
--- a/dist/Thread-Queue/lib/Thread/Queue.pm
+++ b/dist/Thread-Queue/lib/Thread/Queue.pm
@@ -3,7 +3,7 @@ package Thread::Queue;
use strict;
use warnings;
-our $VERSION = '3.05';
+our $VERSION = '3.06';
$VERSION = eval $VERSION;
use threads::shared 1.21;
@@ -26,14 +26,29 @@ sub enqueue
{
my $self = shift;
lock(%$self);
+
if ($$self{'ENDED'}) {
require Carp;
Carp::croak("'enqueue' method called on queue that has been 'end'ed");
}
- push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
+
+ # Block if queue size exceeds any specified limit
+ my $queue = $$self{'queue'};
+ cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
+
+ # Add items to queue, and then signal other threads
+ push(@$queue, map { shared_clone($_) } @_)
and cond_signal(%$self);
}
+# Set or return the max. size for a queue
+sub limit : lvalue
+{
+ my $self = shift;
+ lock(%$self);
+ $$self{'LIMIT'};
+}
+
# Return a count of the number of items on a queue
sub pending
{
@@ -47,7 +62,7 @@ sub pending
sub end
{
my $self = shift;
- lock $self;
+ lock(%$self);
# No more data is coming
$$self{'ENDED'} = 1;
# Try to release at least one blocked thread
@@ -289,7 +304,7 @@ Thread::Queue - Thread-safe queues
=head1 VERSION
-This document describes Thread::Queue version 3.05
+This document describes Thread::Queue version 3.06
=head1 SYNOPSIS
@@ -334,6 +349,9 @@ This document describes Thread::Queue version 3.05
# Work on $item
}
+ # Set a size for a queue
+ $q->limit = 5;
+
# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);
@@ -423,7 +441,7 @@ Adds a list of items onto the end of the queue.
Removes the requested number of items (default is 1) from the head of the
queue, and returns them. If the queue contains fewer than the requested
number of items, then the thread will be blocked until the requisite number
-of items are available (i.e., until other threads <enqueue> more items).
+of items are available (i.e., until other threads C<enqueue> more items).
=item ->dequeue_nb()
@@ -461,6 +479,20 @@ behaves the same as C<dequeue_nb>.
Returns the number of items still in the queue. Returns C<undef> if the queue
has been ended (see below), and there are no more items in the queue.
+=item ->limit
+
+Sets the size of the queue. If set, calls to C<enqueue()> will block until
+the number of pending items in the queue drops below the C<limit>. The
+C<limit> does not prevent enqueuing items beyond that count:
+
+ my $q = Thread::Queue->new(1, 2);
+ $q->limit = 4;
+ $q->enqueue(3, 4, 5); # Does not block
+ $q->enqueue(6); # Blocks until at least 2 items are dequeued
+
+ my $size = $q->limit; # Returns the current limit (may return 'undef')
+ $q->limit = 0; # Queue size is now unlimited
+
=item ->end()
Declares that no more items will be added to the queue.
diff --git a/dist/Thread-Queue/t/07_lock.t b/dist/Thread-Queue/t/07_lock.t
index 0af2db1629..f9e258e092 100644
--- a/dist/Thread-Queue/t/07_lock.t
+++ b/dist/Thread-Queue/t/07_lock.t
@@ -29,7 +29,7 @@ ok($q, 'New queue');
my $sm = Thread::Semaphore->new(0);
my $st = Thread::Semaphore->new(0);
-my $thread = threads->create(sub {
+threads->create(sub {
{
lock($q);
$sm->up();
@@ -39,7 +39,7 @@ my $thread = threads->create(sub {
my @x = $q->extract(5,2);
is_deeply(\@x, [6,7], 'Thread dequeues under lock');
}
-});
+})->detach();
$sm->down();
$st->up();
@@ -47,8 +47,6 @@ my @x = $q->dequeue_nb(100);
is_deeply(\@x, [1..5,8..10], 'Main dequeues');
threads::yield();
-$thread->join;
-
exit(0);
# EOF
diff --git a/dist/Thread-Queue/t/11_limit.t b/dist/Thread-Queue/t/11_limit.t
new file mode 100644
index 0000000000..a2ab91859a
--- /dev/null
+++ b/dist/Thread-Queue/t/11_limit.t
@@ -0,0 +1,101 @@
+use strict;
+use warnings;
+
+use Config;
+
+BEGIN {
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+ if (! $Config{'d_select'}) {
+ print("1..0 # SKIP 'select()' not available for testing\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+use Test::More;
+
+plan tests => 8;
+
+my $q = Thread::Queue->new();
+my $rpt = Thread::Queue->new();
+
+my $th = threads->create( sub {
+ # (1) Set queue limit, and report it
+ $q->limit = 3;
+ $rpt->enqueue($q->limit);
+
+ # (3) Fetch an item from queue
+ my $item = $q->dequeue();
+ is($item, 1, 'Dequeued item 1');
+ # Report queue count
+ $rpt->enqueue($q->pending());
+
+ # q = (2, 3, 4, 5); r = (4)
+
+ # (4) Enqueue more items - will block
+ $q->enqueue(6, 7);
+ # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3)
+
+ # (6) Get reports from main
+ my @items = $rpt->dequeue(5);
+ is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports');
+
+ # Dequeue all items
+ @items = $q->dequeue_nb(99);
+ is_deeply(\@items, [5, 'foo', 6, 7], 'Queue items');
+});
+
+# (2) Read queue limit from thread
+my $item = $rpt->dequeue();
+is($item, $q->limit, 'Queue limit set');
+# Send items
+$q->enqueue(1, 2, 3, 4, 5);
+
+# (5) Read queue count
+$item = $rpt->dequeue;
+# q = (2, 3, 4, 5); r = ()
+is($item, $q->pending(), 'Queue count');
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (2, 3, 4, 5); r = (4)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 2, 'Dequeued item 2');
+# q = (3, 4, 5); r = (4)
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (3, 4, 5); r = (4, 3)
+
+# 'insert' doesn't care about queue limit
+$q->insert(3, 'foo');
+$rpt->enqueue($q->pending);
+# q = (3, 4, 5, 'foo'); r = (4, 3, 4)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 3, 'Dequeued item 3');
+# q = (3, 4, 5); r = (4)
+# Report back the queue count
+$rpt->enqueue($q->pending);
+# q = (4, 5, 'foo'); r = (4, 3, 4, 3)
+
+# Read an item from queue
+$item = $q->dequeue();
+is($item, 4, 'Dequeued item 4');
+# Thread is now unblocked
+
+# Handshake with thread
+$rpt->enqueue('go');
+
+# (7) - Done
+$th->join;
+
+exit(0);
+
+# EOF