summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorArtur Bergman <sky@nanisky.com>2002-05-12 17:59:41 +0000
committerArtur Bergman <sky@nanisky.com>2002-05-12 17:59:41 +0000
commit9c6f85782f1cd7b0fd0a41187c4ee95878c501d4 (patch)
treeca41facdc825a03b8946ec2eb6b1ede3cb53ffa8 /lib
parentd40b16338e2bea9bcea6aeaf36a63ed755d0b747 (diff)
downloadperl-9c6f85782f1cd7b0fd0a41187c4ee95878c501d4.tar.gz
Add emulation layer for Thread/Semaphore and Thread/Queue
p4raw-id: //depot/perl@16559
Diffstat (limited to 'lib')
-rw-r--r--lib/Thread/Queue.pm120
-rw-r--r--lib/Thread/Semaphore.pm106
2 files changed, 226 insertions, 0 deletions
diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm
new file mode 100644
index 0000000000..52854681cf
--- /dev/null
+++ b/lib/Thread/Queue.pm
@@ -0,0 +1,120 @@
+package Thread::Queue;
+
+our $VERSION = '1.00';
+
+our $ithreads;
+our $othreads;
+
+use Thread qw(cond_wait cond_broadcast);
+
+BEGIN {
+ use Config;
+ $ithreads = $Config{useithreads};
+ $othreads = $Config{use5005threads};
+ if($ithreads) {
+ require 'threads/shared/queue.pm';
+ for my $m (qw(new enqueue dequeue dequeue_nb pending)) {
+ no strict 'refs';
+ *{"Thread::Queue::$m"} = \&{"threads::shared::queue::${m}"};
+ }
+ } else {
+ for my $m (qw(new enqueue dequeue dequeue_nb pending)) {
+ no strict 'refs';
+ *{"Thread::Queue::$m"} = \&{"Thread::Queue::${m}_othread"};
+ }
+ }
+}
+
+
+=head1 NAME
+
+Thread::Queue - thread-safe queues
+
+=head1 SYNOPSIS
+
+ use Thread::Queue;
+ my $q = new Thread::Queue;
+ $q->enqueue("foo", "bar");
+ my $foo = $q->dequeue; # The "bar" is still in the queue.
+ my $foo = $q->dequeue_nb; # returns "bar", or undef if the queue was
+ # empty
+ my $left = $q->pending; # returns the number of items still in the queue
+
+=head1 DESCRIPTION
+
+A queue, as implemented by C<Thread::Queue> is a thread-safe data structure
+much like a list. Any number of threads can safely add elements to the end
+of the list, or remove elements from the head of the list. (Queues don't
+permit adding or removing elements from the middle of the list)
+
+=head1 FUNCTIONS AND METHODS
+
+=over 8
+
+=item new
+
+The C<new> function creates a new empty queue.
+
+=item enqueue LIST
+
+The C<enqueue> method adds a list of scalars on to the end of the queue.
+The queue will grow as needed to accomodate the list.
+
+=item dequeue
+
+The C<dequeue> method removes a scalar from the head of the queue and
+returns it. If the queue is currently empty, C<dequeue> will block the
+thread until another thread C<enqueue>s a scalar.
+
+=item dequeue_nb
+
+The C<dequeue_nb> method, like the C<dequeue> method, removes a scalar from
+the head of the queue and returns it. Unlike C<dequeue>, though,
+C<dequeue_nb> won't block if the queue is empty, instead returning
+C<undef>.
+
+=item pending
+
+The C<pending> method returns the number of items still in the queue. (If
+there can be multiple readers on the queue it's best to lock the queue
+before checking to make sure that it stays in a consistent state)
+
+=back
+
+=head1 SEE ALSO
+
+L<Thread>
+
+=cut
+
+sub new_othread {
+ my $class = shift;
+ return bless [@_], $class;
+}
+
+sub dequeue_othread : locked : method {
+ my $q = shift;
+ cond_wait $q until @$q;
+ return shift @$q;
+}
+
+sub dequeue_nb_othread : locked : method {
+ my $q = shift;
+ if (@$q) {
+ return shift @$q;
+ } else {
+ return undef;
+ }
+}
+
+sub enqueue_othread : locked : method {
+ my $q = shift;
+ push(@$q, @_) and cond_broadcast $q;
+}
+
+sub pending_othread : locked : method {
+ my $q = shift;
+ return scalar(@$q);
+}
+
+1;
diff --git a/lib/Thread/Semaphore.pm b/lib/Thread/Semaphore.pm
new file mode 100644
index 0000000000..66e8878c8e
--- /dev/null
+++ b/lib/Thread/Semaphore.pm
@@ -0,0 +1,106 @@
+package Thread::Semaphore;
+use Thread qw(cond_wait cond_broadcast);
+
+our $VERSION = '1.00';
+
+BEGIN {
+ use Config;
+ $ithreads = $Config{useithreads};
+ $othreads = $Config{use5005threads};
+ if($ithreads) {
+ require 'threads/shared/semaphore.pm';
+ for my $m (qw(new up down)) {
+ no strict 'refs';
+ *{"Thread::Semaphore::$m"} = \&{"threads::shared::semaphore::${m}"};
+ }
+ } else {
+ for my $m (qw(new up down)) {
+ no strict 'refs';
+ *{"Thread::Semaphore::$m"} = \&{"Thread::Semaphore::${m}_othread"};
+ }
+ }
+}
+
+
+=head1 NAME
+
+Thread::Semaphore - thread-safe semaphores
+
+=head1 SYNOPSIS
+
+ use Thread::Semaphore;
+ my $s = new Thread::Semaphore;
+ $s->up; # Also known as the semaphore V -operation.
+ # The guarded section is here
+ $s->down; # Also known as the semaphore P -operation.
+
+ # The default semaphore value is 1.
+ my $s = new Thread::Semaphore($initial_value);
+ $s->up($up_value);
+ $s->down($up_value);
+
+=head1 DESCRIPTION
+
+Semaphores provide a mechanism to regulate access to resources. Semaphores,
+unlike locks, aren't tied to particular scalars, and so may be used to
+control access to anything you care to use them for.
+
+Semaphores don't limit their values to zero or one, so they can be used to
+control access to some resource that may have more than one of. (For
+example, filehandles) Increment and decrement amounts aren't fixed at one
+either, so threads can reserve or return multiple resources at once.
+
+=head1 FUNCTIONS AND METHODS
+
+=over 8
+
+=item new
+
+=item new NUMBER
+
+C<new> creates a new semaphore, and initializes its count to the passed
+number. If no number is passed, the semaphore's count is set to one.
+
+=item down
+
+=item down NUMBER
+
+The C<down> method decreases the semaphore's count by the specified number,
+or one if no number has been specified. If the semaphore's count would drop
+below zero, this method will block until such time that the semaphore's
+count is equal to or larger than the amount you're C<down>ing the
+semaphore's count by.
+
+=item up
+
+=item up NUMBER
+
+The C<up> method increases the semaphore's count by the number specified,
+or one if no number's been specified. This will unblock any thread blocked
+trying to C<down> the semaphore if the C<up> raises the semaphore count
+above what the C<down>s are trying to decrement it by.
+
+=back
+
+=cut
+
+sub new_othread {
+ my $class = shift;
+ my $val = @_ ? shift : 1;
+ bless \$val, $class;
+}
+
+sub down_othread : locked : method {
+ my $s = shift;
+ my $inc = @_ ? shift : 1;
+ cond_wait $s until $$s >= $inc;
+ $$s -= $inc;
+}
+
+sub up_othread : locked : method {
+ my $s = shift;
+ my $inc = @_ ? shift : 1;
+ ($$s += $inc) > 0 and cond_broadcast $s;
+}
+
+1;