summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorJarkko Hietaniemi <jhi@iki.fi>2002-07-12 23:44:17 +0000
committerJarkko Hietaniemi <jhi@iki.fi>2002-07-12 23:44:17 +0000
commit83272a45226e83bd136d713158e9b44ace2dbc8d (patch)
treeee348b24fc5020ccd9e375cef8a5eb8e2bcd3d77 /lib
parent484fdf61e8653b10160ba1e8011888f52ab6825a (diff)
downloadperl-83272a45226e83bd136d713158e9b44ace2dbc8d.tar.gz
threads::shared::queue and semaphore become Thread::Semaphore
and Queue. The 5005threads case where the old Semaphore and Queue.pm (they are disguised as .pmx) should get magically installed instead has not been tested. p4raw-id: //depot/perl@17509
Diffstat (limited to 'lib')
-rw-r--r--lib/Thread/Queue.pm90
-rw-r--r--lib/Thread/Queue.t71
-rw-r--r--lib/Thread/Semaphore.pm62
-rw-r--r--lib/Thread/Semaphore.t17
4 files changed, 135 insertions, 105 deletions
diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm
index ebecb7433f..3b5c7c9301 100644
--- a/lib/Thread/Queue.pm
+++ b/lib/Thread/Queue.pm
@@ -1,44 +1,13 @@
package Thread::Queue;
+use threads::shared;
use strict;
-our $VERSION = '1.00';
-
-use Thread qw(cond_wait cond_broadcast);
-
-BEGIN {
- use Config;
- if ($Config{useithreads}) {
- require 'threads/shared/queue.pm';
- for my $meth (qw(new enqueue dequeue dequeue_nb pending)) {
- no strict 'refs';
- *{"Thread::Queue::$meth"} = \&{"threads::shared::queue::$meth"};
- }
- } elsif ($Config{use5005threads}) {
- for my $meth (qw(new enqueue dequeue dequeue_nb pending)) {
- no strict 'refs';
- *{"Thread::Queue::$meth"} = \&{"Thread::Queue::${meth}_othread"};
- }
- } else {
- require Carp;
- Carp::croak("This Perl has neither ithreads nor 5005threads");
- }
-}
-
+our $VERSION = '2.00';
=head1 NAME
-Thread::Queue - thread-safe queues (for old code only)
-
-=head1 CAVEAT
-
-For new code the use of the C<Thread::Queue> module is discouraged and
-the direct use of the C<threads>, C<threads::shared> and
-C<threads::shared::queue> modules is encouraged instead.
-
-For the whole story about the development of threads in Perl, and why you
-should B<not> be using this module unless you know what you're doing, see the
-CAVEAT of the C<Thread> module.
+Thread::Queue - thread-safe queues
=head1 SYNOPSIS
@@ -46,16 +15,16 @@ CAVEAT of the C<Thread> module.
my $q = new Thread::Queue;
$q->enqueue("foo", "bar");
my $foo = $q->dequeue; # The "bar" is still in the queue.
- my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was
- # empty
+ my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was empty
my $left = $q->pending; # returns the number of items still in the queue
=head1 DESCRIPTION
-A queue, as implemented by C<Thread::Queue> is a thread-safe data structure
-much like a list. Any number of threads can safely add elements to the end
-of the list, or remove elements from the head of the list. (Queues don't
-permit adding or removing elements from the middle of the list)
+A queue, as implemented by C<Thread::Queue> is a thread-safe
+data structure much like a list. Any number of threads can safely
+add elements to the end of the list, or remove elements from the head
+of the list. (Queues don't permit adding or removing elements from
+the middle of the list).
=head1 FUNCTIONS AND METHODS
@@ -68,7 +37,7 @@ The C<new> function creates a new empty queue.
=item enqueue LIST
The C<enqueue> method adds a list of scalars on to the end of the queue.
-The queue will grow as needed to accomodate the list.
+The queue will grow as needed to accommodate the list.
=item dequeue
@@ -85,45 +54,48 @@ C<undef>.
=item pending
-The C<pending> method returns the number of items still in the queue. (If
-there can be multiple readers on the queue it's best to lock the queue
-before checking to make sure that it stays in a consistent state)
+The C<pending> method returns the number of items still in the queue.
=back
=head1 SEE ALSO
-L<Thread>
+L<threads>, L<threads::shared>
=cut
-sub new_othread {
+sub new {
my $class = shift;
- return bless [@_], $class;
+ my @q : shared = @_;
+ return bless \@q, $class;
}
-sub dequeue_othread : locked : method {
+sub dequeue {
my $q = shift;
- cond_wait $q until @$q;
+ lock(@$q);
+ cond_wait @$q until @$q;
+ cond_signal @$q if @$q > 1;
return shift @$q;
}
-sub dequeue_nb_othread : locked : method {
- my $q = shift;
- if (@$q) {
+sub dequeue_nb {
+ my $q = shift;
+ lock(@$q);
return shift @$q;
- } else {
- return undef;
- }
}
-sub enqueue_othread : locked : method {
+sub enqueue {
my $q = shift;
- push(@$q, @_) and cond_broadcast $q;
+ lock(@$q);
+ push @$q, @_ and cond_signal @$q;
}
-sub pending_othread : locked : method {
- return scalar(@{(shift)});
+sub pending {
+ my $q = shift;
+ lock(@$q);
+ return scalar(@$q);
}
1;
+
+
diff --git a/lib/Thread/Queue.t b/lib/Thread/Queue.t
new file mode 100644
index 0000000000..33c420b750
--- /dev/null
+++ b/lib/Thread/Queue.t
@@ -0,0 +1,71 @@
+use warnings;
+
+BEGIN {
+ chdir 't' if -d 't';
+ push @INC ,'../lib';
+ require Config; import Config;
+ unless ($Config{'useithreads'}) {
+ print "1..0 # Skip: no ithreads\n";
+ exit 0;
+ }
+}
+
+use strict;
+use threads;
+use Thread::Queue;
+
+my $q = new Thread::Queue;
+$|++;
+print "1..26\n";
+
+my $test : shared = 1;
+
+sub ok {
+ lock($test);
+ print "ok $test\n";
+ $test++;
+}
+
+sub reader {
+ my $tid = threads->self->tid;
+ my $i = 0;
+ while (1) {
+ $i++;
+# print "reader (tid $tid): waiting for element $i...\n";
+ my $el = $q->dequeue;
+ ok();
+# print "ok $test\n"; $test++;
+# print "reader (tid $tid): dequeued element $i: value $el\n";
+ select(undef, undef, undef, rand(1));
+ if ($el == -1) {
+ # end marker
+# print "reader (tid $tid) returning\n";
+ return;
+ }
+ }
+}
+
+my $nthreads = 5;
+my @threads;
+
+for (my $i = 0; $i < $nthreads; $i++) {
+ push @threads, threads->new(\&reader, $i);
+}
+
+for (my $i = 1; $i <= 20; $i++) {
+ my $el = int(rand(100));
+ select(undef, undef, undef, rand(1));
+# print "writer: enqueuing value $el\n";
+ $q->enqueue($el);
+}
+
+$q->enqueue((-1) x $nthreads); # one end marker for each thread
+
+for(@threads) {
+# print "waiting for join\n";
+ $_->join();
+}
+ok();
+#print "ok $test\n";
+
+
diff --git a/lib/Thread/Semaphore.pm b/lib/Thread/Semaphore.pm
index 51cc0c6fef..d3ebe637a7 100644
--- a/lib/Thread/Semaphore.pm
+++ b/lib/Thread/Semaphore.pm
@@ -1,44 +1,12 @@
package Thread::Semaphore;
-use strict;
-
-our $VERSION = '1.00';
-
-use Thread qw(cond_wait cond_broadcast);
-
-BEGIN {
- use Config;
- if ($Config{useithreads}) {
- require 'threads/shared/semaphore.pm';
- for my $meth (qw(new up down)) {
- no strict 'refs';
- *{"Thread::Semaphore::$meth"} = \&{"threads::shared::semaphore::$meth"};
- }
- } elsif ($Config{use5005threads}) {
- for my $meth (qw(new up down)) {
- no strict 'refs';
- *{"Thread::Semaphore::$meth"} = \&{"Thread::Semaphore::${meth}_othread"};
- }
- } else {
- require Carp;
- Carp::croak("This Perl has neither ithreads nor 5005threads");
- }
-}
+use threads::shared;
+our $VERSION = '2.00';
=head1 NAME
-Thread::Semaphore - thread-safe semaphores (for old code only)
-
-=head1 CAVEAT
-
-For new code the use of the C<Thread::Semaphore> module is discouraged and
-the direct use of the C<threads>, C<threads::shared> and
-C<threads::shared::semaphore> modules is encouraged instead.
-
-For the whole story about the development of threads in Perl, and why you
-should B<not> be using this module unless you know what you're doing, see the
-CAVEAT of the C<Thread> module.
+Thread::Semaphore - thread-safe semaphores
=head1 SYNOPSIS
@@ -60,8 +28,8 @@ unlike locks, aren't tied to particular scalars, and so may be used to
control access to anything you care to use them for.
Semaphores don't limit their values to zero or one, so they can be used to
-control access to some resource that may have more than one of. (For
-example, filehandles) Increment and decrement amounts aren't fixed at one
+control access to some resource that there may be more than one of. (For
+example, filehandles). Increment and decrement amounts aren't fixed at one
either, so threads can reserve or return multiple resources at once.
=head1 FUNCTIONS AND METHODS
@@ -80,7 +48,7 @@ number. If no number is passed, the semaphore's count is set to one.
=item down NUMBER
The C<down> method decreases the semaphore's count by the specified number,
-or one if no number has been specified. If the semaphore's count would drop
+or by one if no number has been specified. If the semaphore's count would drop
below zero, this method will block until such time that the semaphore's
count is equal to or larger than the amount you're C<down>ing the
semaphore's count by.
@@ -90,31 +58,33 @@ semaphore's count by.
=item up NUMBER
The C<up> method increases the semaphore's count by the number specified,
-or one if no number's been specified. This will unblock any thread blocked
+or by one if no number has been specified. This will unblock any thread blocked
trying to C<down> the semaphore if the C<up> raises the semaphore count
-above what the C<down>s are trying to decrement it by.
+above the amount that the C<down>s are trying to decrement it by.
=back
=cut
-sub new_othread {
+sub new {
my $class = shift;
- my $val = @_ ? shift : 1;
+ my $val : shared = @_ ? shift : 1;
bless \$val, $class;
}
-sub down_othread : locked : method {
+sub down {
my $s = shift;
+ lock($$s);
my $inc = @_ ? shift : 1;
- cond_wait $s until $$s >= $inc;
+ cond_wait $$s until $$s >= $inc;
$$s -= $inc;
}
-sub up_othread : locked : method {
+sub up {
my $s = shift;
+ lock($$s);
my $inc = @_ ? shift : 1;
- ($$s += $inc) > 0 and cond_broadcast $s;
+ ($$s += $inc) > 0 and cond_broadcast $$s;
}
1;
diff --git a/lib/Thread/Semaphore.t b/lib/Thread/Semaphore.t
new file mode 100644
index 0000000000..14687e0b22
--- /dev/null
+++ b/lib/Thread/Semaphore.t
@@ -0,0 +1,17 @@
+use warnings;
+
+BEGIN {
+ chdir 't' if -d 't';
+ push @INC ,'../lib';
+ require Config; import Config;
+ unless ($Config{'useithreads'}) {
+ print "1..0 # Skip: no ithreads\n";
+ exit 0;
+ }
+}
+
+print "1..1\n";
+use threads;
+use Thread::Semaphore;
+print "ok 1\n";
+