diff options
author | Malcolm Beattie <mbeattie@sable.ox.ac.uk> | 1997-10-16 16:42:13 +0000 |
---|---|---|
committer | Malcolm Beattie <mbeattie@sable.ox.ac.uk> | 1997-10-16 16:42:13 +0000 |
commit | c1f329308b702519079b58d1f4c523ce60dc9990 (patch) | |
tree | 055f535c918fdf314c15c62968c32da6b1cf1ac7 | |
parent | 8023c3ceb7a7110c55b3159dff471253f72f7e15 (diff) | |
parent | 0a00ffdb1e70eb883974513d0ee6f4afd54aca19 (diff) | |
download | perl-c1f329308b702519079b58d1f4c523ce60dc9990.tar.gz |
Move perlext/Thread into perl/ext/Thread.
p4raw-id: //depot/perl@141
-rwxr-xr-x | Configure | 4 | ||||
-rw-r--r-- | ext/Thread/Makefile.PL | 2 | ||||
-rw-r--r-- | ext/Thread/Notes | 13 | ||||
-rw-r--r-- | ext/Thread/README | 20 | ||||
-rw-r--r-- | ext/Thread/Thread.pm | 20 | ||||
-rw-r--r-- | ext/Thread/Thread.xs | 550 | ||||
-rw-r--r-- | ext/Thread/Thread/Queue.pm | 22 | ||||
-rw-r--r-- | ext/Thread/Thread/Semaphore.pm | 25 | ||||
-rw-r--r-- | ext/Thread/create.t | 17 | ||||
-rw-r--r-- | ext/Thread/io.t | 25 | ||||
-rw-r--r-- | ext/Thread/join.t | 11 | ||||
-rw-r--r-- | ext/Thread/join2.t | 12 | ||||
-rw-r--r-- | ext/Thread/list.t | 30 | ||||
-rw-r--r-- | ext/Thread/lock.t | 27 | ||||
-rw-r--r-- | ext/Thread/queue.t | 36 | ||||
-rw-r--r-- | ext/Thread/sync.t | 61 | ||||
-rw-r--r-- | ext/Thread/sync2.t | 69 | ||||
-rw-r--r-- | ext/Thread/typemap | 24 | ||||
-rw-r--r-- | ext/Thread/unsync.t | 37 | ||||
-rw-r--r-- | ext/Thread/unsync2.t | 36 | ||||
-rw-r--r-- | ext/Thread/unsync3.t | 50 | ||||
-rw-r--r-- | ext/Thread/unsync4.t | 38 |
22 files changed, 1129 insertions, 0 deletions
@@ -9829,6 +9829,10 @@ for xxx in $known_extensions ; do $define) avail_ext="$avail_ext $xxx" ;; esac ;; + Thread) case "$usethreads" in + $define) avail_ext="$avail_ext $xxx" ;; + esac + ;; *) avail_ext="$avail_ext $xxx" ;; esac diff --git a/ext/Thread/Makefile.PL b/ext/Thread/Makefile.PL new file mode 100644 index 0000000000..d699091cc1 --- /dev/null +++ b/ext/Thread/Makefile.PL @@ -0,0 +1,2 @@ +use ExtUtils::MakeMaker; +WriteMakefile(NAME => "Thread"); diff --git a/ext/Thread/Notes b/ext/Thread/Notes new file mode 100644 index 0000000000..1505877ee9 --- /dev/null +++ b/ext/Thread/Notes @@ -0,0 +1,13 @@ +Should cvcache be per CV (keyed by thread) or per thread (keyed by CV)? + +Maybe ought to protect all SVs by a mutex for SvREFCNT_{dec,inc}, +upgrades and so on. Then use SvMUTEX instead of CvMUTEX for CVs. +On the other hand, people shouldn't expect concurrent operations +on non-lexicals to be safe anyway. + +Probably don't need to bother keeping track of CvOWNER on clones. + +Either @_ needs to be made lexical or other arrangments need to be +made so that some globs (or just *_) are per-thread. + +tokenbuf and buf probably ought to be global protected by a global lock. diff --git a/ext/Thread/README b/ext/Thread/README new file mode 100644 index 0000000000..a6b22fb4ae --- /dev/null +++ b/ext/Thread/README @@ -0,0 +1,20 @@ +See the README.threads in the main perl 5.004_xx development +distribution (x >= 50) for details of how to build and use this. +If all else fails, read on. + +If your version of patch can't create a file from scratch, then you'll +need to create an empty thread.h manually first. Perl itself will need +to be built with -DUSE_THREADS yet. If you're using MIT pthreads or +another threads package that needs pthread_init() to be called, then +add -DNEED_PTHREAD_INIT. If you're using a threads library that only +follows one of the old POSIX drafts, then you'll probably need to add +-DOLD_PTHREADS_API. I haven't tested -DOLD_PTHREADS_API properly yet +and I think you may still have to tweak a couple of the mutex calls +to follow the old API. + +This extension is copyright Malcolm Beattie 1995-1997 and is freely +distributable under your choice of the GNU Public License or the +Artistic License (see the main perl distribution). + +Malcolm Beattie +mbeattie@sable.ox.ac.uk diff --git a/ext/Thread/Thread.pm b/ext/Thread/Thread.pm new file mode 100644 index 0000000000..2ace5dde2d --- /dev/null +++ b/ext/Thread/Thread.pm @@ -0,0 +1,20 @@ +package Thread; +require Exporter; +require DynaLoader; +@ISA = qw(Exporter DynaLoader); +@EXPORT_OK = qw(yield cond_signal cond_broadcast cond_wait async); + +# +# Methods +# + +# +# Exported functions +# +sub async (&) { + return new Thread $_[0]; +} + +bootstrap Thread; + +1; diff --git a/ext/Thread/Thread.xs b/ext/Thread/Thread.xs new file mode 100644 index 0000000000..3dc25162a7 --- /dev/null +++ b/ext/Thread/Thread.xs @@ -0,0 +1,550 @@ +#include "EXTERN.h" +#include "perl.h" +#include "XSUB.h" + +/* Magic signature for Thread's mg_private is "Th" */ +#define Thread_MAGIC_SIGNATURE 0x5468 + +static U32 threadnum = 0; +static int sig_pipe[2]; + +static void +remove_thread(t) +Thread t; +{ + DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(), + "%p: remove_thread %p\n", thr, t))); + MUTEX_LOCK(&threads_mutex); + MUTEX_DESTROY(&t->mutex); + nthreads--; + t->prev->next = t->next; + t->next->prev = t->prev; + COND_BROADCAST(&nthreads_cond); + MUTEX_UNLOCK(&threads_mutex); +} + +static void * +threadstart(arg) +void *arg; +{ +#ifdef FAKE_THREADS + Thread savethread = thr; + LOGOP myop; + dSP; + I32 oldscope = scopestack_ix; + I32 retval; + AV *returnav; + int i; + + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n", + thr, SvPEEK(TOPs))); + thr = (Thread) arg; + savemark = TOPMARK; + thr->prev = thr->prev_run = savethread; + thr->next = savethread->next; + thr->next_run = savethread->next_run; + savethread->next = savethread->next_run = thr; + thr->wait_queue = 0; + thr->private = 0; + + /* Now duplicate most of perl_call_sv but with a few twists */ + op = (OP*)&myop; + Zero(op, 1, LOGOP); + myop.op_flags = OPf_STACKED; + myop.op_next = Nullop; + myop.op_flags |= OPf_KNOW; + myop.op_flags |= OPf_WANT_LIST; + op = pp_entersub(ARGS); + DEBUG_L(if (!op) + PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n")); + /* + * When this thread is next scheduled, we start in the right + * place. When the thread runs off the end of the sub, perl.c + * handles things, using savemark to figure out how much of the + * stack is the return value for any join. + */ + thr = savethread; /* back to the old thread */ + return 0; +#else + Thread thr = (Thread) arg; + LOGOP myop; + dSP; + I32 oldmark = TOPMARK; + I32 oldscope = scopestack_ix; + I32 retval; + AV *returnav; + int i, ret; + dJMPENV; + + /* Don't call *anything* requiring dTHR until after pthread_setspecific */ + /* + * Wait until our creator releases us. If we didn't do this, then + * it would be potentially possible for out thread to carry on and + * do stuff before our creator fills in our "self" field. For example, + * if we went and created another thread which tried to pthread_join + * with us, then we'd be in a mess. + */ + MUTEX_LOCK(&thr->mutex); + MUTEX_UNLOCK(&thr->mutex); + + /* + * It's safe to wait until now to set the thread-specific pointer + * from our pthread_t structure to our struct thread, since we're + * the only thread who can get at it anyway. + */ + if (pthread_setspecific(thr_key, (void *) thr)) + croak("panic: pthread_setspecific"); + + /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */ + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n", + thr, SvPEEK(TOPs))); + + JMPENV_PUSH(ret); + switch (ret) { + case 3: + PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n"); + /* fall through */ + case 1: + STATUS_ALL_FAILURE; + /* fall through */ + case 2: + /* my_exit() was called */ + while (scopestack_ix > oldscope) + LEAVE; + JMPENV_POP; + av_store(returnav, 0, newSViv(statusvalue)); + goto finishoff; + } + + /* Now duplicate most of perl_call_sv but with a few twists */ + op = (OP*)&myop; + Zero(op, 1, LOGOP); + myop.op_flags = OPf_STACKED; + myop.op_next = Nullop; + myop.op_flags |= OPf_KNOW; + myop.op_flags |= OPf_WANT_LIST; + op = pp_entersub(ARGS); + if (op) + runops(); + SPAGAIN; + retval = sp - (stack_base + oldmark); + sp = stack_base + oldmark + 1; + DEBUG_L(for (i = 1; i <= retval; i++) + PerlIO_printf(PerlIO_stderr(), + "%p returnav[%d] = %s\n", + thr, i, SvPEEK(sp[i - 1]));) + returnav = newAV(); + av_store(returnav, 0, newSVpv("", 0)); + for (i = 1; i <= retval; i++, sp++) + sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp)); + + finishoff: +#if 0 + /* removed for debug */ + SvREFCNT_dec(curstack); +#endif + SvREFCNT_dec(cvcache); + Safefree(markstack); + Safefree(scopestack); + Safefree(savestack); + Safefree(retstack); + Safefree(cxstack); + Safefree(tmps_stack); + + MUTEX_LOCK(&thr->mutex); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: threadstart finishing: state is %u\n", + thr, ThrSTATE(thr))); + switch (ThrSTATE(thr)) { + case THRf_R_JOINABLE: + ThrSETSTATE(thr, THRf_ZOMBIE); + MUTEX_UNLOCK(&thr->mutex); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: R_JOINABLE thread finished\n", thr)); + break; + case THRf_R_JOINED: + ThrSETSTATE(thr, THRf_DEAD); + MUTEX_UNLOCK(&thr->mutex); + remove_thread(thr); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: R_JOINED thread finished\n", thr)); + break; + case THRf_R_DETACHED: + ThrSETSTATE(thr, THRf_DEAD); + MUTEX_UNLOCK(&thr->mutex); + SvREFCNT_dec(returnav); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: DETACHED thread finished\n", thr)); + remove_thread(thr); /* This might trigger main thread to finish */ + break; + default: + MUTEX_UNLOCK(&thr->mutex); + croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr)); + /* NOTREACHED */ + } + return (void *) returnav; /* Available for anyone to join with us */ + /* unless we are detached in which case */ + /* noone will see the value anyway. */ +#endif +} + +static SV * +newthread(startsv, initargs, class) +SV *startsv; +AV *initargs; +char *class; +{ + dTHR; + dSP; + Thread savethread; + int i; + SV *sv; + sigset_t fullmask, oldmask; + + savethread = thr; + sv = newSVpv("", 0); + SvGROW(sv, sizeof(struct thread) + 1); + SvCUR_set(sv, sizeof(struct thread)); + thr = (Thread) SvPVX(sv); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n", + savethread, SvPEEK(startsv), thr)); + oursv = sv; + /* If we don't zero these foostack pointers, init_stacks won't init them */ + markstack = 0; + scopestack = 0; + savestack = 0; + retstack = 0; + init_stacks(ARGS); + curcop = savethread->Tcurcop; /* XXX As good a guess as any? */ + SPAGAIN; + defstash = savethread->Tdefstash; /* XXX maybe these should */ + curstash = savethread->Tcurstash; /* always be set to main? */ + /* top_env? */ + /* runlevel */ + cvcache = newHV(); + thr->flags = THRf_R_JOINABLE; + MUTEX_INIT(&thr->mutex); + thr->tid = ++threadnum; + /* Insert new thread into the circular linked list and bump nthreads */ + MUTEX_LOCK(&threads_mutex); + thr->next = savethread->next; + thr->prev = savethread; + savethread->next = thr; + thr->next->prev = thr; + nthreads++; + MUTEX_UNLOCK(&threads_mutex); + + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: newthread, tid is %u, preparing stack\n", + savethread, thr->tid)); + /* The following pushes the arg list and startsv onto the *new* stack */ + PUSHMARK(sp); + /* Could easily speed up the following greatly */ + for (i = 0; i <= AvFILL(initargs); i++) + XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE))); + XPUSHs(SvREFCNT_inc(startsv)); + PUTBACK; + +#ifdef FAKE_THREADS + threadstart(thr); +#else + /* On your marks... */ + MUTEX_LOCK(&thr->mutex); + /* Get set... + * Increment the global thread count. + */ + sigfillset(&fullmask); + if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1) + croak("panic: sigprocmask"); + if (pthread_create(&self, NULL, threadstart, (void*) thr)) + return NULL; /* XXX should clean up first */ + /* Go */ + MUTEX_UNLOCK(&thr->mutex); + if (sigprocmask(SIG_SETMASK, &oldmask, 0)) + croak("panic: sigprocmask"); +#endif + sv = newSViv(thr->tid); + sv_magic(sv, oursv, '~', 0, 0); + SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; + return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE)); +} + +static Signal_t +handle_thread_signal(sig) +int sig; +{ + char c = (char) sig; + write(sig_pipe[0], &c, 1); +} + +MODULE = Thread PACKAGE = Thread + +void +new(class, startsv, ...) + char * class + SV * startsv + AV * av = av_make(items - 2, &ST(2)); + PPCODE: + XPUSHs(sv_2mortal(newthread(startsv, av, class))); + +void +join(t) + Thread t + AV * av = NO_INIT + int i = NO_INIT + PPCODE: + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n", + thr, t, ThrSTATE(t));); + MUTEX_LOCK(&t->mutex); + switch (ThrSTATE(t)) { + case THRf_R_JOINABLE: + case THRf_R_JOINED: + ThrSETSTATE(t, THRf_R_JOINED); + MUTEX_UNLOCK(&t->mutex); + break; + case THRf_ZOMBIE: + ThrSETSTATE(t, THRf_DEAD); + MUTEX_UNLOCK(&t->mutex); + remove_thread(t); + break; + default: + MUTEX_UNLOCK(&t->mutex); + croak("can't join with thread"); + /* NOTREACHED */ + } + if (pthread_join(t->Tself, (void **) &av)) + croak("pthread_join failed"); + + /* Could easily speed up the following if necessary */ + for (i = 0; i <= AvFILL(av); i++) + XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE))); + +void +detach(t) + Thread t + CODE: + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n", + thr, t, ThrSTATE(t));); + MUTEX_LOCK(&t->mutex); + switch (ThrSTATE(t)) { + case THRf_R_JOINABLE: + ThrSETSTATE(t, THRf_R_DETACHED); + /* fall through */ + case THRf_R_DETACHED: + DETACH(t); + MUTEX_UNLOCK(&t->mutex); + break; + case THRf_ZOMBIE: + ThrSETSTATE(t, THRf_DEAD); + DETACH(t); + MUTEX_UNLOCK(&t->mutex); + remove_thread(t); + break; + default: + MUTEX_UNLOCK(&t->mutex); + croak("can't detach thread"); + /* NOTREACHED */ + } + +void +equal(t1, t2) + Thread t1 + Thread t2 + PPCODE: + PUSHs((t1 == t2) ? &sv_yes : &sv_no); + +void +flags(t) + Thread t + PPCODE: + PUSHs(sv_2mortal(newSViv(t->flags))); + +void +self(class) + char * class + PREINIT: + SV *sv; + PPCODE: + sv = newSViv(thr->tid); + sv_magic(sv, oursv, '~', 0, 0); + SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; + PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE)))); + +U32 +tid(t) + Thread t + CODE: + MUTEX_LOCK(&t->mutex); + RETVAL = t->tid; + MUTEX_UNLOCK(&t->mutex); + OUTPUT: + RETVAL + +void +DESTROY(t) + SV * t + PPCODE: + PUSHs(&sv_yes); + +void +yield() + CODE: +#ifdef OLD_PTHREADS_API + pthread_yield(); +#else +#ifndef NO_SCHED_YIELD + sched_yield(); +#endif /* NO_SCHED_YIELD */ +#endif /* OLD_PTHREADS_API */ + +void +cond_wait(sv) + SV * sv + MAGIC * mg = NO_INIT +CODE: + if (SvROK(sv)) + sv = SvRV(sv); + + mg = condpair_magic(sv); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv)); + MUTEX_LOCK(MgMUTEXP(mg)); + if (MgOWNER(mg) != thr) { + MUTEX_UNLOCK(MgMUTEXP(mg)); + croak("cond_wait for lock that we don't own\n"); + } + MgOWNER(mg) = 0; + COND_WAIT(MgCONDP(mg), MgMUTEXP(mg)); + while (MgOWNER(mg)) + COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg)); + MgOWNER(mg) = thr; + MUTEX_UNLOCK(MgMUTEXP(mg)); + +void +cond_signal(sv) + SV * sv + MAGIC * mg = NO_INIT +CODE: + if (SvROK(sv)) + sv = SvRV(sv); + + mg = condpair_magic(sv); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv)); + MUTEX_LOCK(MgMUTEXP(mg)); + if (MgOWNER(mg) != thr) { + MUTEX_UNLOCK(MgMUTEXP(mg)); + croak("cond_signal for lock that we don't own\n"); + } + COND_SIGNAL(MgCONDP(mg)); + MUTEX_UNLOCK(MgMUTEXP(mg)); + +void +cond_broadcast(sv) + SV * sv + MAGIC * mg = NO_INIT +CODE: + if (SvROK(sv)) + sv = SvRV(sv); + + mg = condpair_magic(sv); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n", + thr, sv)); + MUTEX_LOCK(MgMUTEXP(mg)); + if (MgOWNER(mg) != thr) { + MUTEX_UNLOCK(MgMUTEXP(mg)); + croak("cond_broadcast for lock that we don't own\n"); + } + COND_BROADCAST(MgCONDP(mg)); + MUTEX_UNLOCK(MgMUTEXP(mg)); + +void +list(class) + char * class + PREINIT: + Thread t; + AV * av; + SV ** svp; + int n = 0; + PPCODE: + av = newAV(); + /* + * Iterate until we have enough dynamic storage for all threads. + * We mustn't do any allocation while holding threads_mutex though. + */ + MUTEX_LOCK(&threads_mutex); + do { + n = nthreads; + MUTEX_UNLOCK(&threads_mutex); + if (AvFILL(av) < n - 1) { + int i = AvFILL(av); + for (i = AvFILL(av); i < n - 1; i++) { + SV *sv = newSViv(0); /* fill in tid later */ + sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */ + av_push(av, sv_bless(newRV_noinc(sv), + gv_stashpv(class, TRUE))); + + } + } + MUTEX_LOCK(&threads_mutex); + } while (n < nthreads); + n = nthreads; /* Get the final correct value */ + + /* + * At this point, there's enough room to fill in av. + * Note that we are holding threads_mutex so the list + * won't change out from under us but all the remaining + * processing is "fast" (no blocking, malloc etc.) + */ + t = thr; + svp = AvARRAY(av); + do { + SV *sv = (SV*)SvRV(*svp); + sv_setiv(sv, t->tid); + SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv); + SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED; + SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; + t = t->next; + svp++; + } while (t != thr); + /* */ + MUTEX_UNLOCK(&threads_mutex); + /* Truncate any unneeded slots in av */ + av_fill(av, n - 1); + /* Finally, push all the new objects onto the stack and drop av */ + EXTEND(sp, n); + for (svp = AvARRAY(av); n > 0; n--, svp++) + PUSHs(*svp); + (void)sv_2mortal((SV*)av); + + +MODULE = Thread PACKAGE = Thread::Signal + +void +kill_sighandler_thread() + PPCODE: + write(sig_pipe[0], "\0", 1); + PUSHs(&sv_yes); + +void +init_thread_signals() + PPCODE: + sighandlerp = handle_thread_signal; + if (pipe(sig_pipe) == -1) + XSRETURN_UNDEF; + PUSHs(&sv_yes); + +SV * +await_signal() + PREINIT: + char c; + ssize_t ret; + CODE: + do { + ret = read(sig_pipe[1], &c, 1); + } while (ret == -1 && errno == EINTR); + if (ret == -1) + croak("panic: await_signal"); + if (ret == 0) + XSRETURN_UNDEF; + RETVAL = c ? psig_ptr[c] : &sv_no; + OUTPUT: + RETVAL diff --git a/ext/Thread/Thread/Queue.pm b/ext/Thread/Thread/Queue.pm new file mode 100644 index 0000000000..4eef978bd6 --- /dev/null +++ b/ext/Thread/Thread/Queue.pm @@ -0,0 +1,22 @@ +package Thread::Queue; +use Thread qw(cond_wait cond_broadcast); + +sub new { + my $class = shift; + return bless [@_], $class; +} + +sub dequeue { + use attrs qw(locked method); + my $q = shift; + cond_wait $q until @$q; + return shift @$q; +} + +sub enqueue { + use attrs qw(locked method); + my $q = shift; + push(@$q, @_) and cond_broadcast $q; +} + +1; diff --git a/ext/Thread/Thread/Semaphore.pm b/ext/Thread/Thread/Semaphore.pm new file mode 100644 index 0000000000..9e5852f15c --- /dev/null +++ b/ext/Thread/Thread/Semaphore.pm @@ -0,0 +1,25 @@ +package Thread::Semaphore; +use Thread qw(cond_wait cond_broadcast); + +sub new { + my $class = shift; + my $val = @_ ? shift : 1; + bless \$val, $class; +} + +sub down { + use attrs qw(locked method); + my $s = shift; + my $inc = @_ ? shift : 1; + cond_wait $s until $$s >= $inc; + $$s -= $inc; +} + +sub up { + use attrs qw(locked method); + my $s = shift; + my $inc = @_ ? shift : 1; + ($$s += $inc) > 0 and cond_broadcast $s; +} + +1; diff --git a/ext/Thread/create.t b/ext/Thread/create.t new file mode 100644 index 0000000000..7d6d189e92 --- /dev/null +++ b/ext/Thread/create.t @@ -0,0 +1,17 @@ +use Thread; +sub start_here { + my $i; + print "In start_here with args: @_\n"; + for ($i = 1; $i <= 5; $i++) { + print "start_here: $i\n"; + sleep 1; + } +} + +print "Starting new thread now\n"; +$t = new Thread \&start_here, qw(foo bar baz); +print "Started thread $t\n"; +for ($count = 1; $count <= 5; $count++) { + print "main: $count\n"; + sleep 1; +} diff --git a/ext/Thread/io.t b/ext/Thread/io.t new file mode 100644 index 0000000000..8ade26504d --- /dev/null +++ b/ext/Thread/io.t @@ -0,0 +1,25 @@ +use Thread; + +sub reader { + my $line; + while ($line = <STDIN>) { + print "reader: $line"; + } + print "End of input in reader\n"; + return 0; +} + +print <<'EOT'; +This test starts up a thread to read and echo whatever is typed on +the keyboard/stdin, line by line, while the main thread counts down +to zero. The test stays running until both the main thread has +finished counting down and the I/O thread has seen end-of-file on +the terminal/stdin. +EOT + +$r = new Thread \&reader; +$count = 10; +while ($count--) { + sleep 1; + print "ping $count\n"; +} diff --git a/ext/Thread/join.t b/ext/Thread/join.t new file mode 100644 index 0000000000..640256a9b3 --- /dev/null +++ b/ext/Thread/join.t @@ -0,0 +1,11 @@ +use Thread; +sub foo { + print "In foo with args: @_\n"; + return (7, 8, 9); +} + +print "Starting thread\n"; +$t = new Thread \&foo, qw(foo bar baz); +print "Joining with $t\n"; +@results = $t->join(); +print "Joining returned @results\n"; diff --git a/ext/Thread/join2.t b/ext/Thread/join2.t new file mode 100644 index 0000000000..99b43a54dc --- /dev/null +++ b/ext/Thread/join2.t @@ -0,0 +1,12 @@ +use Thread; +sub foo { + print "In foo with args: @_\n"; + return (7, 8, 9); +} + +print "Starting thread\n"; +$t = new Thread \&foo, qw(foo bar baz); +sleep 2; +print "Joining with $t\n"; +@results = $t->join(); +print "Joining returned @results\n"; diff --git a/ext/Thread/list.t b/ext/Thread/list.t new file mode 100644 index 0000000000..f13f4b266a --- /dev/null +++ b/ext/Thread/list.t @@ -0,0 +1,30 @@ +use Thread qw(async); +use Thread::Semaphore; + +my $sem = Thread::Semaphore->new(0); + +$nthreads = 4; + +for (my $i = 0; $i < $nthreads; $i++) { + async { + my $tid = Thread->self->tid; + print "thread $tid started...\n"; + $sem->down; + print "thread $tid finishing\n"; + }; +} + +print "main: started $nthreads threads\n"; +sleep 2; + +my @list = Thread->list; +printf "main: Thread->list returned %d threads\n", scalar(@list); + +foreach my $t (@list) { + print "inspecting thread $t...\n"; + print "...deref is $$t\n"; + print "...flags = ", $t->flags, "\n"; + print "...tid = ", $t->tid, "\n"; +} +print "main thread telling workers to finish off...\n"; +$sem->up($nthreads); diff --git a/ext/Thread/lock.t b/ext/Thread/lock.t new file mode 100644 index 0000000000..fefb129879 --- /dev/null +++ b/ext/Thread/lock.t @@ -0,0 +1,27 @@ +use Thread; + +$level = 0; + +sub worker +{ + my $num = shift; + my $i; + print "thread $num starting\n"; + for ($i = 1; $i <= 20; $i++) { + print "thread $num iteration $i\n"; + select(undef, undef, undef, rand(10)/100); + { + lock($lock); + warn "thread $num saw non-zero level = $level\n" if $level; + $level++; + print "thread $num has lock\n"; + select(undef, undef, undef, rand(10)/100); + $level--; + } + print "thread $num released lock\n"; + } +} + +for ($t = 1; $t <= 5; $t++) { + new Thread \&worker, $t; +} diff --git a/ext/Thread/queue.t b/ext/Thread/queue.t new file mode 100644 index 0000000000..4672ba6ee7 --- /dev/null +++ b/ext/Thread/queue.t @@ -0,0 +1,36 @@ +use Thread; +use Thread::Queue; + +$q = new Thread::Queue; + +sub reader { + my $tid = Thread->self->tid; + my $i = 0; + while (1) { + $i++; + print "reader (tid $tid): waiting for element $i...\n"; + my $el = $q->dequeue; + print "reader (tid $tid): dequeued element $i: value $el\n"; + select(undef, undef, undef, rand(2)); + if ($el == -1) { + # end marker + print "reader (tid $tid) returning\n"; + return; + } + } +} + +my $nthreads = 3; + +for (my $i = 0; $i < $nthreads; $i++) { + Thread->new(\&reader, $i); +} + +for (my $i = 1; $i <= 10; $i++) { + my $el = int(rand(100)); + select(undef, undef, undef, rand(2)); + print "writer: enqueuing value $el\n"; + $q->enqueue($el); +} + +$q->enqueue((-1) x $nthreads); # one end marker for each thread diff --git a/ext/Thread/sync.t b/ext/Thread/sync.t new file mode 100644 index 0000000000..9c2e5897da --- /dev/null +++ b/ext/Thread/sync.t @@ -0,0 +1,61 @@ +use Thread; + +$level = 0; + +sub single_file { + use attrs 'locked'; + my $arg = shift; + $level++; + print "Level $level for $arg\n"; + print "(something is wrong)\n" if $level < 0 || $level > 1; + sleep 1; + $level--; + print "Back to level $level\n"; +} + +sub start_bar { + my $i; + print "start bar\n"; + for $i (1..3) { + print "bar $i\n"; + single_file("bar $i"); + sleep 1 if rand > 0.5; + } + print "end bar\n"; + return 1; +} + +sub start_foo { + my $i; + print "start foo\n"; + for $i (1..3) { + print "foo $i\n"; + single_file("foo $i"); + sleep 1 if rand > 0.5; + } + print "end foo\n"; + return 1; +} + +sub start_baz { + my $i; + print "start baz\n"; + for $i (1..3) { + print "baz $i\n"; + single_file("baz $i"); + sleep 1 if rand > 0.5; + } + print "end baz\n"; + return 1; +} + +$| = 1; +srand($$^$^T); + +$foo = new Thread \&start_foo; +$bar = new Thread \&start_bar; +$baz = new Thread \&start_baz; +$foo->join(); +$bar->join(); +$baz->join(); +print "main: threads finished, exiting\n"; diff --git a/ext/Thread/sync2.t b/ext/Thread/sync2.t new file mode 100644 index 0000000000..0901da46a0 --- /dev/null +++ b/ext/Thread/sync2.t @@ -0,0 +1,69 @@ +use Thread; + +$global = undef; + +sub single_file { + use attrs 'locked'; + my $who = shift; + my $i; + + print "Uh oh: $who entered while locked by $global\n" if $global; + $global = $who; + print "["; + for ($i = 0; $i < int(10 * rand); $i++) { + print $who; + select(undef, undef, undef, 0.1); + } + print "]"; + $global = undef; +} + +sub start_a { + my ($i, $j); + for ($j = 0; $j < 10; $j++) { + single_file("A"); + for ($i = 0; $i < int(10 * rand); $i++) { + print "a"; + select(undef, undef, undef, 0.1); + } + } +} + +sub start_b { + my ($i, $j); + for ($j = 0; $j < 10; $j++) { + single_file("B"); + for ($i = 0; $i < int(10 * rand); $i++) { + print "b"; + select(undef, undef, undef, 0.1); + } + } +} + +sub start_c { + my ($i, $j); + for ($j = 0; $j < 10; $j++) { + single_file("C"); + for ($i = 0; $i < int(10 * rand); $i++) { + print "c"; + select(undef, undef, undef, 0.1); + } + } +} + +$| = 1; +srand($$^$^T); + +print <<'EOT'; +Each pair of square brackets [...] should contain a repeated sequence of +a unique upper case letter. Lower case letters may appear randomly both +in and out of the brackets. +EOT +$foo = new Thread \&start_a; +$bar = new Thread \&start_b; +$baz = new Thread \&start_c; +print "\nmain: joining...\n"; +#$foo->join; +#$bar->join; +#$baz->join; +print "\ndone\n"; diff --git a/ext/Thread/typemap b/ext/Thread/typemap new file mode 100644 index 0000000000..fd6e99d947 --- /dev/null +++ b/ext/Thread/typemap @@ -0,0 +1,24 @@ +Thread T_XSCPTR + +INPUT +T_XSCPTR + STMT_START { + MAGIC *mg; + SV *sv = ($arg); + + if (!sv_isobject(sv)) + croak(\"$var is not an object\"); + sv = (SV*)SvRV(sv); + if (!SvRMAGICAL(sv) || !(mg = mg_find(sv, '~')) + || mg->mg_private != ${ntype}_MAGIC_SIGNATURE) + croak(\"XSUB ${func_name}: $var is a forged ${ntype} object\"); + $var = ($type) SvPVX(mg->mg_obj); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + \"XSUB ${func_name}: %p\\n\", $var);) + } STMT_END +T_IVREF + if (SvROK($arg)) + $var = ($type) SvIV((SV*)SvRV($arg)); + else + croak(\"$var is not a reference\") + diff --git a/ext/Thread/unsync.t b/ext/Thread/unsync.t new file mode 100644 index 0000000000..f0a51efe1f --- /dev/null +++ b/ext/Thread/unsync.t @@ -0,0 +1,37 @@ +use Thread; + +$| = 1; + +if (@ARGV) { + srand($ARGV[0]); +} else { + my $seed = $$ ^ $^T; + print "Randomising to $seed\n"; + srand($seed); +} + +sub whoami { + my ($depth, $a, $b, $c) = @_; + my $i; + print "whoami ($depth): $a $b $c\n"; + sleep 1; + whoami($depth - 1, $a, $b, $c) if $depth > 0; +} + +sub start_foo { + my $r = 3 + int(10 * rand); + print "start_foo: r is $r\n"; + whoami($r, "start_foo", "foo1", "foo2"); + print "start_foo: finished\n"; +} + +sub start_bar { + my $r = 3 + int(10 * rand); + print "start_bar: r is $r\n"; + whoami($r, "start_bar", "bar1", "bar2"); + print "start_bar: finished\n"; +} + +$foo = new Thread \&start_foo; +$bar = new Thread \&start_bar; +print "main: exiting\n"; diff --git a/ext/Thread/unsync2.t b/ext/Thread/unsync2.t new file mode 100644 index 0000000000..fb955ac31e --- /dev/null +++ b/ext/Thread/unsync2.t @@ -0,0 +1,36 @@ +use Thread; + +$| = 1; + +srand($$^$^T); + +sub printargs { + my $thread = shift; + my $arg; + my $i; + while ($arg = shift) { + my $delay = int(rand(500)); + $i++; + print "$thread arg $i is $arg\n"; + 1 while $delay--; + } +} + +sub start_thread { + my $thread = shift; + my $count = 10; + while ($count--) { + my(@args) = ($thread) x int(rand(10)); + print "$thread $count calling printargs @args\n"; + printargs($thread, @args); + } +} + +new Thread (\&start_thread, "A"); +new Thread (\&start_thread, "B"); +#new Thread (\&start_thread, "C"); +#new Thread (\&start_thread, "D"); +#new Thread (\&start_thread, "E"); +#new Thread (\&start_thread, "F"); + +print "main: exiting\n"; diff --git a/ext/Thread/unsync3.t b/ext/Thread/unsync3.t new file mode 100644 index 0000000000..e03e9c8af1 --- /dev/null +++ b/ext/Thread/unsync3.t @@ -0,0 +1,50 @@ +use Thread; + +$| = 1; + +srand($$^$^T); + +sub whoami { + my $thread = shift; + print $thread; +} + +sub uppercase { + my $count = 100; + while ($count--) { + my $i = int(rand(1000)); + 1 while $i--; + print "A"; + $i = int(rand(1000)); + 1 while $i--; + whoami("B"); + } +} + +sub lowercase { + my $count = 100; + while ($count--) { + my $i = int(rand(1000)); + 1 while $i--; + print "x"; + $i = int(rand(1000)); + 1 while $i--; + whoami("y"); + } +} + +sub numbers { + my $count = 100; + while ($count--) { + my $i = int(rand(1000)); + 1 while $i--; + print 1; + $i = int(rand(1000)); + 1 while $i--; + whoami(2); + } +} + +new Thread \&numbers; +new Thread \&uppercase; +new Thread \&lowercase; diff --git a/ext/Thread/unsync4.t b/ext/Thread/unsync4.t new file mode 100644 index 0000000000..494ad2be92 --- /dev/null +++ b/ext/Thread/unsync4.t @@ -0,0 +1,38 @@ +use Thread; + +$| = 1; + +srand($$^$^T); + +sub printargs { + my(@copyargs) = @_; + my $thread = shift @copyargs; + my $arg; + my $i; + while ($arg = shift @copyargs) { + my $delay = int(rand(500)); + $i++; + print "$thread arg $i is $arg\n"; + 1 while $delay--; + } +} + +sub start_thread { + my(@threadargs) = @_; + my $thread = $threadargs[0]; + my $count = 10; + while ($count--) { + my(@args) = ($thread) x int(rand(10)); + print "$thread $count calling printargs @args\n"; + printargs($thread, @args); + } +} + +new Thread (\&start_thread, "A"); +new Thread (\&start_thread, "B"); +new Thread (\&start_thread, "C"); +new Thread (\&start_thread, "D"); +new Thread (\&start_thread, "E"); +new Thread (\&start_thread, "F"); + +print "main: exiting\n"; |