diff options
author | Artur Bergman <sky@nanisky.com> | 2002-05-12 17:59:41 +0000 |
---|---|---|
committer | Artur Bergman <sky@nanisky.com> | 2002-05-12 17:59:41 +0000 |
commit | 9c6f85782f1cd7b0fd0a41187c4ee95878c501d4 (patch) | |
tree | ca41facdc825a03b8946ec2eb6b1ede3cb53ffa8 /lib | |
parent | d40b16338e2bea9bcea6aeaf36a63ed755d0b747 (diff) | |
download | perl-9c6f85782f1cd7b0fd0a41187c4ee95878c501d4.tar.gz |
Add emulation layer for Thread/Semaphore and Thread/Queue
p4raw-id: //depot/perl@16559
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Thread/Queue.pm | 120 | ||||
-rw-r--r-- | lib/Thread/Semaphore.pm | 106 |
2 files changed, 226 insertions, 0 deletions
diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm new file mode 100644 index 0000000000..52854681cf --- /dev/null +++ b/lib/Thread/Queue.pm @@ -0,0 +1,120 @@ +package Thread::Queue; + +our $VERSION = '1.00'; + +our $ithreads; +our $othreads; + +use Thread qw(cond_wait cond_broadcast); + +BEGIN { + use Config; + $ithreads = $Config{useithreads}; + $othreads = $Config{use5005threads}; + if($ithreads) { + require 'threads/shared/queue.pm'; + for my $m (qw(new enqueue dequeue dequeue_nb pending)) { + no strict 'refs'; + *{"Thread::Queue::$m"} = \&{"threads::shared::queue::${m}"}; + } + } else { + for my $m (qw(new enqueue dequeue dequeue_nb pending)) { + no strict 'refs'; + *{"Thread::Queue::$m"} = \&{"Thread::Queue::${m}_othread"}; + } + } +} + + +=head1 NAME + +Thread::Queue - thread-safe queues + +=head1 SYNOPSIS + + use Thread::Queue; + my $q = new Thread::Queue; + $q->enqueue("foo", "bar"); + my $foo = $q->dequeue; # The "bar" is still in the queue. + my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was + # empty + my $left = $q->pending; # returns the number of items still in the queue + +=head1 DESCRIPTION + +A queue, as implemented by C<Thread::Queue> is a thread-safe data structure +much like a list. Any number of threads can safely add elements to the end +of the list, or remove elements from the head of the list. (Queues don't +permit adding or removing elements from the middle of the list) + +=head1 FUNCTIONS AND METHODS + +=over 8 + +=item new + +The C<new> function creates a new empty queue. + +=item enqueue LIST + +The C<enqueue> method adds a list of scalars on to the end of the queue. +The queue will grow as needed to accomodate the list. + +=item dequeue + +The C<dequeue> method removes a scalar from the head of the queue and +returns it. If the queue is currently empty, C<dequeue> will block the +thread until another thread C<enqueue>s a scalar. + +=item dequeue_nb + +The C<dequeue_nb> method, like the C<dequeue> method, removes a scalar from +the head of the queue and returns it. Unlike C<dequeue>, though, +C<dequeue_nb> won't block if the queue is empty, instead returning +C<undef>. + +=item pending + +The C<pending> method returns the number of items still in the queue. (If +there can be multiple readers on the queue it's best to lock the queue +before checking to make sure that it stays in a consistent state) + +=back + +=head1 SEE ALSO + +L<Thread> + +=cut + +sub new_othread { + my $class = shift; + return bless [@_], $class; +} + +sub dequeue_othread : locked : method { + my $q = shift; + cond_wait $q until @$q; + return shift @$q; +} + +sub dequeue_nb_othread : locked : method { + my $q = shift; + if (@$q) { + return shift @$q; + } else { + return undef; + } +} + +sub enqueue_othread : locked : method { + my $q = shift; + push(@$q, @_) and cond_broadcast $q; +} + +sub pending_othread : locked : method { + my $q = shift; + return scalar(@$q); +} + +1; diff --git a/lib/Thread/Semaphore.pm b/lib/Thread/Semaphore.pm new file mode 100644 index 0000000000..66e8878c8e --- /dev/null +++ b/lib/Thread/Semaphore.pm @@ -0,0 +1,106 @@ +package Thread::Semaphore; +use Thread qw(cond_wait cond_broadcast); + +our $VERSION = '1.00'; + +BEGIN { + use Config; + $ithreads = $Config{useithreads}; + $othreads = $Config{use5005threads}; + if($ithreads) { + require 'threads/shared/semaphore.pm'; + for my $m (qw(new up down)) { + no strict 'refs'; + *{"Thread::Semaphore::$m"} = \&{"threads::shared::semaphore::${m}"}; + } + } else { + for my $m (qw(new up down)) { + no strict 'refs'; + *{"Thread::Semaphore::$m"} = \&{"Thread::Semaphore::${m}_othread"}; + } + } +} + + +=head1 NAME + +Thread::Semaphore - thread-safe semaphores + +=head1 SYNOPSIS + + use Thread::Semaphore; + my $s = new Thread::Semaphore; + $s->up; # Also known as the semaphore V -operation. + # The guarded section is here + $s->down; # Also known as the semaphore P -operation. + + # The default semaphore value is 1. + my $s = new Thread::Semaphore($initial_value); + $s->up($up_value); + $s->down($up_value); + +=head1 DESCRIPTION + +Semaphores provide a mechanism to regulate access to resources. Semaphores, +unlike locks, aren't tied to particular scalars, and so may be used to +control access to anything you care to use them for. + +Semaphores don't limit their values to zero or one, so they can be used to +control access to some resource that may have more than one of. (For +example, filehandles) Increment and decrement amounts aren't fixed at one +either, so threads can reserve or return multiple resources at once. + +=head1 FUNCTIONS AND METHODS + +=over 8 + +=item new + +=item new NUMBER + +C<new> creates a new semaphore, and initializes its count to the passed +number. If no number is passed, the semaphore's count is set to one. + +=item down + +=item down NUMBER + +The C<down> method decreases the semaphore's count by the specified number, +or one if no number has been specified. If the semaphore's count would drop +below zero, this method will block until such time that the semaphore's +count is equal to or larger than the amount you're C<down>ing the +semaphore's count by. + +=item up + +=item up NUMBER + +The C<up> method increases the semaphore's count by the number specified, +or one if no number's been specified. This will unblock any thread blocked +trying to C<down> the semaphore if the C<up> raises the semaphore count +above what the C<down>s are trying to decrement it by. + +=back + +=cut + +sub new_othread { + my $class = shift; + my $val = @_ ? shift : 1; + bless \$val, $class; +} + +sub down_othread : locked : method { + my $s = shift; + my $inc = @_ ? shift : 1; + cond_wait $s until $$s >= $inc; + $$s -= $inc; +} + +sub up_othread : locked : method { + my $s = shift; + my $inc = @_ ? shift : 1; + ($$s += $inc) > 0 and cond_broadcast $s; +} + +1; |