summaryrefslogtreecommitdiff
path: root/ext/Thread
diff options
context:
space:
mode:
authorMalcolm Beattie <mbeattie@sable.ox.ac.uk>1997-10-16 16:42:13 +0000
committerMalcolm Beattie <mbeattie@sable.ox.ac.uk>1997-10-16 16:42:13 +0000
commitc1f329308b702519079b58d1f4c523ce60dc9990 (patch)
tree055f535c918fdf314c15c62968c32da6b1cf1ac7 /ext/Thread
parent8023c3ceb7a7110c55b3159dff471253f72f7e15 (diff)
parent0a00ffdb1e70eb883974513d0ee6f4afd54aca19 (diff)
downloadperl-c1f329308b702519079b58d1f4c523ce60dc9990.tar.gz
Move perlext/Thread into perl/ext/Thread.
p4raw-id: //depot/perl@141
Diffstat (limited to 'ext/Thread')
-rw-r--r--ext/Thread/Makefile.PL2
-rw-r--r--ext/Thread/Notes13
-rw-r--r--ext/Thread/README20
-rw-r--r--ext/Thread/Thread.pm20
-rw-r--r--ext/Thread/Thread.xs550
-rw-r--r--ext/Thread/Thread/Queue.pm22
-rw-r--r--ext/Thread/Thread/Semaphore.pm25
-rw-r--r--ext/Thread/create.t17
-rw-r--r--ext/Thread/io.t25
-rw-r--r--ext/Thread/join.t11
-rw-r--r--ext/Thread/join2.t12
-rw-r--r--ext/Thread/list.t30
-rw-r--r--ext/Thread/lock.t27
-rw-r--r--ext/Thread/queue.t36
-rw-r--r--ext/Thread/sync.t61
-rw-r--r--ext/Thread/sync2.t69
-rw-r--r--ext/Thread/typemap24
-rw-r--r--ext/Thread/unsync.t37
-rw-r--r--ext/Thread/unsync2.t36
-rw-r--r--ext/Thread/unsync3.t50
-rw-r--r--ext/Thread/unsync4.t38
21 files changed, 1125 insertions, 0 deletions
diff --git a/ext/Thread/Makefile.PL b/ext/Thread/Makefile.PL
new file mode 100644
index 0000000000..d699091cc1
--- /dev/null
+++ b/ext/Thread/Makefile.PL
@@ -0,0 +1,2 @@
+use ExtUtils::MakeMaker;
+WriteMakefile(NAME => "Thread");
diff --git a/ext/Thread/Notes b/ext/Thread/Notes
new file mode 100644
index 0000000000..1505877ee9
--- /dev/null
+++ b/ext/Thread/Notes
@@ -0,0 +1,13 @@
+Should cvcache be per CV (keyed by thread) or per thread (keyed by CV)?
+
+Maybe ought to protect all SVs by a mutex for SvREFCNT_{dec,inc},
+upgrades and so on. Then use SvMUTEX instead of CvMUTEX for CVs.
+On the other hand, people shouldn't expect concurrent operations
+on non-lexicals to be safe anyway.
+
+Probably don't need to bother keeping track of CvOWNER on clones.
+
+Either @_ needs to be made lexical or other arrangments need to be
+made so that some globs (or just *_) are per-thread.
+
+tokenbuf and buf probably ought to be global protected by a global lock.
diff --git a/ext/Thread/README b/ext/Thread/README
new file mode 100644
index 0000000000..a6b22fb4ae
--- /dev/null
+++ b/ext/Thread/README
@@ -0,0 +1,20 @@
+See the README.threads in the main perl 5.004_xx development
+distribution (x >= 50) for details of how to build and use this.
+If all else fails, read on.
+
+If your version of patch can't create a file from scratch, then you'll
+need to create an empty thread.h manually first. Perl itself will need
+to be built with -DUSE_THREADS yet. If you're using MIT pthreads or
+another threads package that needs pthread_init() to be called, then
+add -DNEED_PTHREAD_INIT. If you're using a threads library that only
+follows one of the old POSIX drafts, then you'll probably need to add
+-DOLD_PTHREADS_API. I haven't tested -DOLD_PTHREADS_API properly yet
+and I think you may still have to tweak a couple of the mutex calls
+to follow the old API.
+
+This extension is copyright Malcolm Beattie 1995-1997 and is freely
+distributable under your choice of the GNU Public License or the
+Artistic License (see the main perl distribution).
+
+Malcolm Beattie
+mbeattie@sable.ox.ac.uk
diff --git a/ext/Thread/Thread.pm b/ext/Thread/Thread.pm
new file mode 100644
index 0000000000..2ace5dde2d
--- /dev/null
+++ b/ext/Thread/Thread.pm
@@ -0,0 +1,20 @@
+package Thread;
+require Exporter;
+require DynaLoader;
+@ISA = qw(Exporter DynaLoader);
+@EXPORT_OK = qw(yield cond_signal cond_broadcast cond_wait async);
+
+#
+# Methods
+#
+
+#
+# Exported functions
+#
+sub async (&) {
+ return new Thread $_[0];
+}
+
+bootstrap Thread;
+
+1;
diff --git a/ext/Thread/Thread.xs b/ext/Thread/Thread.xs
new file mode 100644
index 0000000000..3dc25162a7
--- /dev/null
+++ b/ext/Thread/Thread.xs
@@ -0,0 +1,550 @@
+#include "EXTERN.h"
+#include "perl.h"
+#include "XSUB.h"
+
+/* Magic signature for Thread's mg_private is "Th" */
+#define Thread_MAGIC_SIGNATURE 0x5468
+
+static U32 threadnum = 0;
+static int sig_pipe[2];
+
+static void
+remove_thread(t)
+Thread t;
+{
+ DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
+ "%p: remove_thread %p\n", thr, t)));
+ MUTEX_LOCK(&threads_mutex);
+ MUTEX_DESTROY(&t->mutex);
+ nthreads--;
+ t->prev->next = t->next;
+ t->next->prev = t->prev;
+ COND_BROADCAST(&nthreads_cond);
+ MUTEX_UNLOCK(&threads_mutex);
+}
+
+static void *
+threadstart(arg)
+void *arg;
+{
+#ifdef FAKE_THREADS
+ Thread savethread = thr;
+ LOGOP myop;
+ dSP;
+ I32 oldscope = scopestack_ix;
+ I32 retval;
+ AV *returnav;
+ int i;
+
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
+ thr, SvPEEK(TOPs)));
+ thr = (Thread) arg;
+ savemark = TOPMARK;
+ thr->prev = thr->prev_run = savethread;
+ thr->next = savethread->next;
+ thr->next_run = savethread->next_run;
+ savethread->next = savethread->next_run = thr;
+ thr->wait_queue = 0;
+ thr->private = 0;
+
+ /* Now duplicate most of perl_call_sv but with a few twists */
+ op = (OP*)&myop;
+ Zero(op, 1, LOGOP);
+ myop.op_flags = OPf_STACKED;
+ myop.op_next = Nullop;
+ myop.op_flags |= OPf_KNOW;
+ myop.op_flags |= OPf_WANT_LIST;
+ op = pp_entersub(ARGS);
+ DEBUG_L(if (!op)
+ PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
+ /*
+ * When this thread is next scheduled, we start in the right
+ * place. When the thread runs off the end of the sub, perl.c
+ * handles things, using savemark to figure out how much of the
+ * stack is the return value for any join.
+ */
+ thr = savethread; /* back to the old thread */
+ return 0;
+#else
+ Thread thr = (Thread) arg;
+ LOGOP myop;
+ dSP;
+ I32 oldmark = TOPMARK;
+ I32 oldscope = scopestack_ix;
+ I32 retval;
+ AV *returnav;
+ int i, ret;
+ dJMPENV;
+
+ /* Don't call *anything* requiring dTHR until after pthread_setspecific */
+ /*
+ * Wait until our creator releases us. If we didn't do this, then
+ * it would be potentially possible for out thread to carry on and
+ * do stuff before our creator fills in our "self" field. For example,
+ * if we went and created another thread which tried to pthread_join
+ * with us, then we'd be in a mess.
+ */
+ MUTEX_LOCK(&thr->mutex);
+ MUTEX_UNLOCK(&thr->mutex);
+
+ /*
+ * It's safe to wait until now to set the thread-specific pointer
+ * from our pthread_t structure to our struct thread, since we're
+ * the only thread who can get at it anyway.
+ */
+ if (pthread_setspecific(thr_key, (void *) thr))
+ croak("panic: pthread_setspecific");
+
+ /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
+ thr, SvPEEK(TOPs)));
+
+ JMPENV_PUSH(ret);
+ switch (ret) {
+ case 3:
+ PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
+ /* fall through */
+ case 1:
+ STATUS_ALL_FAILURE;
+ /* fall through */
+ case 2:
+ /* my_exit() was called */
+ while (scopestack_ix > oldscope)
+ LEAVE;
+ JMPENV_POP;
+ av_store(returnav, 0, newSViv(statusvalue));
+ goto finishoff;
+ }
+
+ /* Now duplicate most of perl_call_sv but with a few twists */
+ op = (OP*)&myop;
+ Zero(op, 1, LOGOP);
+ myop.op_flags = OPf_STACKED;
+ myop.op_next = Nullop;
+ myop.op_flags |= OPf_KNOW;
+ myop.op_flags |= OPf_WANT_LIST;
+ op = pp_entersub(ARGS);
+ if (op)
+ runops();
+ SPAGAIN;
+ retval = sp - (stack_base + oldmark);
+ sp = stack_base + oldmark + 1;
+ DEBUG_L(for (i = 1; i <= retval; i++)
+ PerlIO_printf(PerlIO_stderr(),
+ "%p returnav[%d] = %s\n",
+ thr, i, SvPEEK(sp[i - 1]));)
+ returnav = newAV();
+ av_store(returnav, 0, newSVpv("", 0));
+ for (i = 1; i <= retval; i++, sp++)
+ sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
+
+ finishoff:
+#if 0
+ /* removed for debug */
+ SvREFCNT_dec(curstack);
+#endif
+ SvREFCNT_dec(cvcache);
+ Safefree(markstack);
+ Safefree(scopestack);
+ Safefree(savestack);
+ Safefree(retstack);
+ Safefree(cxstack);
+ Safefree(tmps_stack);
+
+ MUTEX_LOCK(&thr->mutex);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+ "%p: threadstart finishing: state is %u\n",
+ thr, ThrSTATE(thr)));
+ switch (ThrSTATE(thr)) {
+ case THRf_R_JOINABLE:
+ ThrSETSTATE(thr, THRf_ZOMBIE);
+ MUTEX_UNLOCK(&thr->mutex);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+ "%p: R_JOINABLE thread finished\n", thr));
+ break;
+ case THRf_R_JOINED:
+ ThrSETSTATE(thr, THRf_DEAD);
+ MUTEX_UNLOCK(&thr->mutex);
+ remove_thread(thr);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+ "%p: R_JOINED thread finished\n", thr));
+ break;
+ case THRf_R_DETACHED:
+ ThrSETSTATE(thr, THRf_DEAD);
+ MUTEX_UNLOCK(&thr->mutex);
+ SvREFCNT_dec(returnav);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+ "%p: DETACHED thread finished\n", thr));
+ remove_thread(thr); /* This might trigger main thread to finish */
+ break;
+ default:
+ MUTEX_UNLOCK(&thr->mutex);
+ croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
+ /* NOTREACHED */
+ }
+ return (void *) returnav; /* Available for anyone to join with us */
+ /* unless we are detached in which case */
+ /* noone will see the value anyway. */
+#endif
+}
+
+static SV *
+newthread(startsv, initargs, class)
+SV *startsv;
+AV *initargs;
+char *class;
+{
+ dTHR;
+ dSP;
+ Thread savethread;
+ int i;
+ SV *sv;
+ sigset_t fullmask, oldmask;
+
+ savethread = thr;
+ sv = newSVpv("", 0);
+ SvGROW(sv, sizeof(struct thread) + 1);
+ SvCUR_set(sv, sizeof(struct thread));
+ thr = (Thread) SvPVX(sv);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n",
+ savethread, SvPEEK(startsv), thr));
+ oursv = sv;
+ /* If we don't zero these foostack pointers, init_stacks won't init them */
+ markstack = 0;
+ scopestack = 0;
+ savestack = 0;
+ retstack = 0;
+ init_stacks(ARGS);
+ curcop = savethread->Tcurcop; /* XXX As good a guess as any? */
+ SPAGAIN;
+ defstash = savethread->Tdefstash; /* XXX maybe these should */
+ curstash = savethread->Tcurstash; /* always be set to main? */
+ /* top_env? */
+ /* runlevel */
+ cvcache = newHV();
+ thr->flags = THRf_R_JOINABLE;
+ MUTEX_INIT(&thr->mutex);
+ thr->tid = ++threadnum;
+ /* Insert new thread into the circular linked list and bump nthreads */
+ MUTEX_LOCK(&threads_mutex);
+ thr->next = savethread->next;
+ thr->prev = savethread;
+ savethread->next = thr;
+ thr->next->prev = thr;
+ nthreads++;
+ MUTEX_UNLOCK(&threads_mutex);
+
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+ "%p: newthread, tid is %u, preparing stack\n",
+ savethread, thr->tid));
+ /* The following pushes the arg list and startsv onto the *new* stack */
+ PUSHMARK(sp);
+ /* Could easily speed up the following greatly */
+ for (i = 0; i <= AvFILL(initargs); i++)
+ XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
+ XPUSHs(SvREFCNT_inc(startsv));
+ PUTBACK;
+
+#ifdef FAKE_THREADS
+ threadstart(thr);
+#else
+ /* On your marks... */
+ MUTEX_LOCK(&thr->mutex);
+ /* Get set...
+ * Increment the global thread count.
+ */
+ sigfillset(&fullmask);
+ if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
+ croak("panic: sigprocmask");
+ if (pthread_create(&self, NULL, threadstart, (void*) thr))
+ return NULL; /* XXX should clean up first */
+ /* Go */
+ MUTEX_UNLOCK(&thr->mutex);
+ if (sigprocmask(SIG_SETMASK, &oldmask, 0))
+ croak("panic: sigprocmask");
+#endif
+ sv = newSViv(thr->tid);
+ sv_magic(sv, oursv, '~', 0, 0);
+ SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
+ return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
+}
+
+static Signal_t
+handle_thread_signal(sig)
+int sig;
+{
+ char c = (char) sig;
+ write(sig_pipe[0], &c, 1);
+}
+
+MODULE = Thread PACKAGE = Thread
+
+void
+new(class, startsv, ...)
+ char * class
+ SV * startsv
+ AV * av = av_make(items - 2, &ST(2));
+ PPCODE:
+ XPUSHs(sv_2mortal(newthread(startsv, av, class)));
+
+void
+join(t)
+ Thread t
+ AV * av = NO_INIT
+ int i = NO_INIT
+ PPCODE:
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
+ thr, t, ThrSTATE(t)););
+ MUTEX_LOCK(&t->mutex);
+ switch (ThrSTATE(t)) {
+ case THRf_R_JOINABLE:
+ case THRf_R_JOINED:
+ ThrSETSTATE(t, THRf_R_JOINED);
+ MUTEX_UNLOCK(&t->mutex);
+ break;
+ case THRf_ZOMBIE:
+ ThrSETSTATE(t, THRf_DEAD);
+ MUTEX_UNLOCK(&t->mutex);
+ remove_thread(t);
+ break;
+ default:
+ MUTEX_UNLOCK(&t->mutex);
+ croak("can't join with thread");
+ /* NOTREACHED */
+ }
+ if (pthread_join(t->Tself, (void **) &av))
+ croak("pthread_join failed");
+
+ /* Could easily speed up the following if necessary */
+ for (i = 0; i <= AvFILL(av); i++)
+ XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
+
+void
+detach(t)
+ Thread t
+ CODE:
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
+ thr, t, ThrSTATE(t)););
+ MUTEX_LOCK(&t->mutex);
+ switch (ThrSTATE(t)) {
+ case THRf_R_JOINABLE:
+ ThrSETSTATE(t, THRf_R_DETACHED);
+ /* fall through */
+ case THRf_R_DETACHED:
+ DETACH(t);
+ MUTEX_UNLOCK(&t->mutex);
+ break;
+ case THRf_ZOMBIE:
+ ThrSETSTATE(t, THRf_DEAD);
+ DETACH(t);
+ MUTEX_UNLOCK(&t->mutex);
+ remove_thread(t);
+ break;
+ default:
+ MUTEX_UNLOCK(&t->mutex);
+ croak("can't detach thread");
+ /* NOTREACHED */
+ }
+
+void
+equal(t1, t2)
+ Thread t1
+ Thread t2
+ PPCODE:
+ PUSHs((t1 == t2) ? &sv_yes : &sv_no);
+
+void
+flags(t)
+ Thread t
+ PPCODE:
+ PUSHs(sv_2mortal(newSViv(t->flags)));
+
+void
+self(class)
+ char * class
+ PREINIT:
+ SV *sv;
+ PPCODE:
+ sv = newSViv(thr->tid);
+ sv_magic(sv, oursv, '~', 0, 0);
+ SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
+ PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
+
+U32
+tid(t)
+ Thread t
+ CODE:
+ MUTEX_LOCK(&t->mutex);
+ RETVAL = t->tid;
+ MUTEX_UNLOCK(&t->mutex);
+ OUTPUT:
+ RETVAL
+
+void
+DESTROY(t)
+ SV * t
+ PPCODE:
+ PUSHs(&sv_yes);
+
+void
+yield()
+ CODE:
+#ifdef OLD_PTHREADS_API
+ pthread_yield();
+#else
+#ifndef NO_SCHED_YIELD
+ sched_yield();
+#endif /* NO_SCHED_YIELD */
+#endif /* OLD_PTHREADS_API */
+
+void
+cond_wait(sv)
+ SV * sv
+ MAGIC * mg = NO_INIT
+CODE:
+ if (SvROK(sv))
+ sv = SvRV(sv);
+
+ mg = condpair_magic(sv);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
+ MUTEX_LOCK(MgMUTEXP(mg));
+ if (MgOWNER(mg) != thr) {
+ MUTEX_UNLOCK(MgMUTEXP(mg));
+ croak("cond_wait for lock that we don't own\n");
+ }
+ MgOWNER(mg) = 0;
+ COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
+ while (MgOWNER(mg))
+ COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
+ MgOWNER(mg) = thr;
+ MUTEX_UNLOCK(MgMUTEXP(mg));
+
+void
+cond_signal(sv)
+ SV * sv
+ MAGIC * mg = NO_INIT
+CODE:
+ if (SvROK(sv))
+ sv = SvRV(sv);
+
+ mg = condpair_magic(sv);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
+ MUTEX_LOCK(MgMUTEXP(mg));
+ if (MgOWNER(mg) != thr) {
+ MUTEX_UNLOCK(MgMUTEXP(mg));
+ croak("cond_signal for lock that we don't own\n");
+ }
+ COND_SIGNAL(MgCONDP(mg));
+ MUTEX_UNLOCK(MgMUTEXP(mg));
+
+void
+cond_broadcast(sv)
+ SV * sv
+ MAGIC * mg = NO_INIT
+CODE:
+ if (SvROK(sv))
+ sv = SvRV(sv);
+
+ mg = condpair_magic(sv);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
+ thr, sv));
+ MUTEX_LOCK(MgMUTEXP(mg));
+ if (MgOWNER(mg) != thr) {
+ MUTEX_UNLOCK(MgMUTEXP(mg));
+ croak("cond_broadcast for lock that we don't own\n");
+ }
+ COND_BROADCAST(MgCONDP(mg));
+ MUTEX_UNLOCK(MgMUTEXP(mg));
+
+void
+list(class)
+ char * class
+ PREINIT:
+ Thread t;
+ AV * av;
+ SV ** svp;
+ int n = 0;
+ PPCODE:
+ av = newAV();
+ /*
+ * Iterate until we have enough dynamic storage for all threads.
+ * We mustn't do any allocation while holding threads_mutex though.
+ */
+ MUTEX_LOCK(&threads_mutex);
+ do {
+ n = nthreads;
+ MUTEX_UNLOCK(&threads_mutex);
+ if (AvFILL(av) < n - 1) {
+ int i = AvFILL(av);
+ for (i = AvFILL(av); i < n - 1; i++) {
+ SV *sv = newSViv(0); /* fill in tid later */
+ sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
+ av_push(av, sv_bless(newRV_noinc(sv),
+ gv_stashpv(class, TRUE)));
+
+ }
+ }
+ MUTEX_LOCK(&threads_mutex);
+ } while (n < nthreads);
+ n = nthreads; /* Get the final correct value */
+
+ /*
+ * At this point, there's enough room to fill in av.
+ * Note that we are holding threads_mutex so the list
+ * won't change out from under us but all the remaining
+ * processing is "fast" (no blocking, malloc etc.)
+ */
+ t = thr;
+ svp = AvARRAY(av);
+ do {
+ SV *sv = (SV*)SvRV(*svp);
+ sv_setiv(sv, t->tid);
+ SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
+ SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
+ SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
+ t = t->next;
+ svp++;
+ } while (t != thr);
+ /* */
+ MUTEX_UNLOCK(&threads_mutex);
+ /* Truncate any unneeded slots in av */
+ av_fill(av, n - 1);
+ /* Finally, push all the new objects onto the stack and drop av */
+ EXTEND(sp, n);
+ for (svp = AvARRAY(av); n > 0; n--, svp++)
+ PUSHs(*svp);
+ (void)sv_2mortal((SV*)av);
+
+
+MODULE = Thread PACKAGE = Thread::Signal
+
+void
+kill_sighandler_thread()
+ PPCODE:
+ write(sig_pipe[0], "\0", 1);
+ PUSHs(&sv_yes);
+
+void
+init_thread_signals()
+ PPCODE:
+ sighandlerp = handle_thread_signal;
+ if (pipe(sig_pipe) == -1)
+ XSRETURN_UNDEF;
+ PUSHs(&sv_yes);
+
+SV *
+await_signal()
+ PREINIT:
+ char c;
+ ssize_t ret;
+ CODE:
+ do {
+ ret = read(sig_pipe[1], &c, 1);
+ } while (ret == -1 && errno == EINTR);
+ if (ret == -1)
+ croak("panic: await_signal");
+ if (ret == 0)
+ XSRETURN_UNDEF;
+ RETVAL = c ? psig_ptr[c] : &sv_no;
+ OUTPUT:
+ RETVAL
diff --git a/ext/Thread/Thread/Queue.pm b/ext/Thread/Thread/Queue.pm
new file mode 100644
index 0000000000..4eef978bd6
--- /dev/null
+++ b/ext/Thread/Thread/Queue.pm
@@ -0,0 +1,22 @@
+package Thread::Queue;
+use Thread qw(cond_wait cond_broadcast);
+
+sub new {
+ my $class = shift;
+ return bless [@_], $class;
+}
+
+sub dequeue {
+ use attrs qw(locked method);
+ my $q = shift;
+ cond_wait $q until @$q;
+ return shift @$q;
+}
+
+sub enqueue {
+ use attrs qw(locked method);
+ my $q = shift;
+ push(@$q, @_) and cond_broadcast $q;
+}
+
+1;
diff --git a/ext/Thread/Thread/Semaphore.pm b/ext/Thread/Thread/Semaphore.pm
new file mode 100644
index 0000000000..9e5852f15c
--- /dev/null
+++ b/ext/Thread/Thread/Semaphore.pm
@@ -0,0 +1,25 @@
+package Thread::Semaphore;
+use Thread qw(cond_wait cond_broadcast);
+
+sub new {
+ my $class = shift;
+ my $val = @_ ? shift : 1;
+ bless \$val, $class;
+}
+
+sub down {
+ use attrs qw(locked method);
+ my $s = shift;
+ my $inc = @_ ? shift : 1;
+ cond_wait $s until $$s >= $inc;
+ $$s -= $inc;
+}
+
+sub up {
+ use attrs qw(locked method);
+ my $s = shift;
+ my $inc = @_ ? shift : 1;
+ ($$s += $inc) > 0 and cond_broadcast $s;
+}
+
+1;
diff --git a/ext/Thread/create.t b/ext/Thread/create.t
new file mode 100644
index 0000000000..7d6d189e92
--- /dev/null
+++ b/ext/Thread/create.t
@@ -0,0 +1,17 @@
+use Thread;
+sub start_here {
+ my $i;
+ print "In start_here with args: @_\n";
+ for ($i = 1; $i <= 5; $i++) {
+ print "start_here: $i\n";
+ sleep 1;
+ }
+}
+
+print "Starting new thread now\n";
+$t = new Thread \&start_here, qw(foo bar baz);
+print "Started thread $t\n";
+for ($count = 1; $count <= 5; $count++) {
+ print "main: $count\n";
+ sleep 1;
+}
diff --git a/ext/Thread/io.t b/ext/Thread/io.t
new file mode 100644
index 0000000000..8ade26504d
--- /dev/null
+++ b/ext/Thread/io.t
@@ -0,0 +1,25 @@
+use Thread;
+
+sub reader {
+ my $line;
+ while ($line = <STDIN>) {
+ print "reader: $line";
+ }
+ print "End of input in reader\n";
+ return 0;
+}
+
+print <<'EOT';
+This test starts up a thread to read and echo whatever is typed on
+the keyboard/stdin, line by line, while the main thread counts down
+to zero. The test stays running until both the main thread has
+finished counting down and the I/O thread has seen end-of-file on
+the terminal/stdin.
+EOT
+
+$r = new Thread \&reader;
+$count = 10;
+while ($count--) {
+ sleep 1;
+ print "ping $count\n";
+}
diff --git a/ext/Thread/join.t b/ext/Thread/join.t
new file mode 100644
index 0000000000..640256a9b3
--- /dev/null
+++ b/ext/Thread/join.t
@@ -0,0 +1,11 @@
+use Thread;
+sub foo {
+ print "In foo with args: @_\n";
+ return (7, 8, 9);
+}
+
+print "Starting thread\n";
+$t = new Thread \&foo, qw(foo bar baz);
+print "Joining with $t\n";
+@results = $t->join();
+print "Joining returned @results\n";
diff --git a/ext/Thread/join2.t b/ext/Thread/join2.t
new file mode 100644
index 0000000000..99b43a54dc
--- /dev/null
+++ b/ext/Thread/join2.t
@@ -0,0 +1,12 @@
+use Thread;
+sub foo {
+ print "In foo with args: @_\n";
+ return (7, 8, 9);
+}
+
+print "Starting thread\n";
+$t = new Thread \&foo, qw(foo bar baz);
+sleep 2;
+print "Joining with $t\n";
+@results = $t->join();
+print "Joining returned @results\n";
diff --git a/ext/Thread/list.t b/ext/Thread/list.t
new file mode 100644
index 0000000000..f13f4b266a
--- /dev/null
+++ b/ext/Thread/list.t
@@ -0,0 +1,30 @@
+use Thread qw(async);
+use Thread::Semaphore;
+
+my $sem = Thread::Semaphore->new(0);
+
+$nthreads = 4;
+
+for (my $i = 0; $i < $nthreads; $i++) {
+ async {
+ my $tid = Thread->self->tid;
+ print "thread $tid started...\n";
+ $sem->down;
+ print "thread $tid finishing\n";
+ };
+}
+
+print "main: started $nthreads threads\n";
+sleep 2;
+
+my @list = Thread->list;
+printf "main: Thread->list returned %d threads\n", scalar(@list);
+
+foreach my $t (@list) {
+ print "inspecting thread $t...\n";
+ print "...deref is $$t\n";
+ print "...flags = ", $t->flags, "\n";
+ print "...tid = ", $t->tid, "\n";
+}
+print "main thread telling workers to finish off...\n";
+$sem->up($nthreads);
diff --git a/ext/Thread/lock.t b/ext/Thread/lock.t
new file mode 100644
index 0000000000..fefb129879
--- /dev/null
+++ b/ext/Thread/lock.t
@@ -0,0 +1,27 @@
+use Thread;
+
+$level = 0;
+
+sub worker
+{
+ my $num = shift;
+ my $i;
+ print "thread $num starting\n";
+ for ($i = 1; $i <= 20; $i++) {
+ print "thread $num iteration $i\n";
+ select(undef, undef, undef, rand(10)/100);
+ {
+ lock($lock);
+ warn "thread $num saw non-zero level = $level\n" if $level;
+ $level++;
+ print "thread $num has lock\n";
+ select(undef, undef, undef, rand(10)/100);
+ $level--;
+ }
+ print "thread $num released lock\n";
+ }
+}
+
+for ($t = 1; $t <= 5; $t++) {
+ new Thread \&worker, $t;
+}
diff --git a/ext/Thread/queue.t b/ext/Thread/queue.t
new file mode 100644
index 0000000000..4672ba6ee7
--- /dev/null
+++ b/ext/Thread/queue.t
@@ -0,0 +1,36 @@
+use Thread;
+use Thread::Queue;
+
+$q = new Thread::Queue;
+
+sub reader {
+ my $tid = Thread->self->tid;
+ my $i = 0;
+ while (1) {
+ $i++;
+ print "reader (tid $tid): waiting for element $i...\n";
+ my $el = $q->dequeue;
+ print "reader (tid $tid): dequeued element $i: value $el\n";
+ select(undef, undef, undef, rand(2));
+ if ($el == -1) {
+ # end marker
+ print "reader (tid $tid) returning\n";
+ return;
+ }
+ }
+}
+
+my $nthreads = 3;
+
+for (my $i = 0; $i < $nthreads; $i++) {
+ Thread->new(\&reader, $i);
+}
+
+for (my $i = 1; $i <= 10; $i++) {
+ my $el = int(rand(100));
+ select(undef, undef, undef, rand(2));
+ print "writer: enqueuing value $el\n";
+ $q->enqueue($el);
+}
+
+$q->enqueue((-1) x $nthreads); # one end marker for each thread
diff --git a/ext/Thread/sync.t b/ext/Thread/sync.t
new file mode 100644
index 0000000000..9c2e5897da
--- /dev/null
+++ b/ext/Thread/sync.t
@@ -0,0 +1,61 @@
+use Thread;
+
+$level = 0;
+
+sub single_file {
+ use attrs 'locked';
+ my $arg = shift;
+ $level++;
+ print "Level $level for $arg\n";
+ print "(something is wrong)\n" if $level < 0 || $level > 1;
+ sleep 1;
+ $level--;
+ print "Back to level $level\n";
+}
+
+sub start_bar {
+ my $i;
+ print "start bar\n";
+ for $i (1..3) {
+ print "bar $i\n";
+ single_file("bar $i");
+ sleep 1 if rand > 0.5;
+ }
+ print "end bar\n";
+ return 1;
+}
+
+sub start_foo {
+ my $i;
+ print "start foo\n";
+ for $i (1..3) {
+ print "foo $i\n";
+ single_file("foo $i");
+ sleep 1 if rand > 0.5;
+ }
+ print "end foo\n";
+ return 1;
+}
+
+sub start_baz {
+ my $i;
+ print "start baz\n";
+ for $i (1..3) {
+ print "baz $i\n";
+ single_file("baz $i");
+ sleep 1 if rand > 0.5;
+ }
+ print "end baz\n";
+ return 1;
+}
+
+$| = 1;
+srand($$^$^T);
+
+$foo = new Thread \&start_foo;
+$bar = new Thread \&start_bar;
+$baz = new Thread \&start_baz;
+$foo->join();
+$bar->join();
+$baz->join();
+print "main: threads finished, exiting\n";
diff --git a/ext/Thread/sync2.t b/ext/Thread/sync2.t
new file mode 100644
index 0000000000..0901da46a0
--- /dev/null
+++ b/ext/Thread/sync2.t
@@ -0,0 +1,69 @@
+use Thread;
+
+$global = undef;
+
+sub single_file {
+ use attrs 'locked';
+ my $who = shift;
+ my $i;
+
+ print "Uh oh: $who entered while locked by $global\n" if $global;
+ $global = $who;
+ print "[";
+ for ($i = 0; $i < int(10 * rand); $i++) {
+ print $who;
+ select(undef, undef, undef, 0.1);
+ }
+ print "]";
+ $global = undef;
+}
+
+sub start_a {
+ my ($i, $j);
+ for ($j = 0; $j < 10; $j++) {
+ single_file("A");
+ for ($i = 0; $i < int(10 * rand); $i++) {
+ print "a";
+ select(undef, undef, undef, 0.1);
+ }
+ }
+}
+
+sub start_b {
+ my ($i, $j);
+ for ($j = 0; $j < 10; $j++) {
+ single_file("B");
+ for ($i = 0; $i < int(10 * rand); $i++) {
+ print "b";
+ select(undef, undef, undef, 0.1);
+ }
+ }
+}
+
+sub start_c {
+ my ($i, $j);
+ for ($j = 0; $j < 10; $j++) {
+ single_file("C");
+ for ($i = 0; $i < int(10 * rand); $i++) {
+ print "c";
+ select(undef, undef, undef, 0.1);
+ }
+ }
+}
+
+$| = 1;
+srand($$^$^T);
+
+print <<'EOT';
+Each pair of square brackets [...] should contain a repeated sequence of
+a unique upper case letter. Lower case letters may appear randomly both
+in and out of the brackets.
+EOT
+$foo = new Thread \&start_a;
+$bar = new Thread \&start_b;
+$baz = new Thread \&start_c;
+print "\nmain: joining...\n";
+#$foo->join;
+#$bar->join;
+#$baz->join;
+print "\ndone\n";
diff --git a/ext/Thread/typemap b/ext/Thread/typemap
new file mode 100644
index 0000000000..fd6e99d947
--- /dev/null
+++ b/ext/Thread/typemap
@@ -0,0 +1,24 @@
+Thread T_XSCPTR
+
+INPUT
+T_XSCPTR
+ STMT_START {
+ MAGIC *mg;
+ SV *sv = ($arg);
+
+ if (!sv_isobject(sv))
+ croak(\"$var is not an object\");
+ sv = (SV*)SvRV(sv);
+ if (!SvRMAGICAL(sv) || !(mg = mg_find(sv, '~'))
+ || mg->mg_private != ${ntype}_MAGIC_SIGNATURE)
+ croak(\"XSUB ${func_name}: $var is a forged ${ntype} object\");
+ $var = ($type) SvPVX(mg->mg_obj);
+ DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+ \"XSUB ${func_name}: %p\\n\", $var);)
+ } STMT_END
+T_IVREF
+ if (SvROK($arg))
+ $var = ($type) SvIV((SV*)SvRV($arg));
+ else
+ croak(\"$var is not a reference\")
+
diff --git a/ext/Thread/unsync.t b/ext/Thread/unsync.t
new file mode 100644
index 0000000000..f0a51efe1f
--- /dev/null
+++ b/ext/Thread/unsync.t
@@ -0,0 +1,37 @@
+use Thread;
+
+$| = 1;
+
+if (@ARGV) {
+ srand($ARGV[0]);
+} else {
+ my $seed = $$ ^ $^T;
+ print "Randomising to $seed\n";
+ srand($seed);
+}
+
+sub whoami {
+ my ($depth, $a, $b, $c) = @_;
+ my $i;
+ print "whoami ($depth): $a $b $c\n";
+ sleep 1;
+ whoami($depth - 1, $a, $b, $c) if $depth > 0;
+}
+
+sub start_foo {
+ my $r = 3 + int(10 * rand);
+ print "start_foo: r is $r\n";
+ whoami($r, "start_foo", "foo1", "foo2");
+ print "start_foo: finished\n";
+}
+
+sub start_bar {
+ my $r = 3 + int(10 * rand);
+ print "start_bar: r is $r\n";
+ whoami($r, "start_bar", "bar1", "bar2");
+ print "start_bar: finished\n";
+}
+
+$foo = new Thread \&start_foo;
+$bar = new Thread \&start_bar;
+print "main: exiting\n";
diff --git a/ext/Thread/unsync2.t b/ext/Thread/unsync2.t
new file mode 100644
index 0000000000..fb955ac31e
--- /dev/null
+++ b/ext/Thread/unsync2.t
@@ -0,0 +1,36 @@
+use Thread;
+
+$| = 1;
+
+srand($$^$^T);
+
+sub printargs {
+ my $thread = shift;
+ my $arg;
+ my $i;
+ while ($arg = shift) {
+ my $delay = int(rand(500));
+ $i++;
+ print "$thread arg $i is $arg\n";
+ 1 while $delay--;
+ }
+}
+
+sub start_thread {
+ my $thread = shift;
+ my $count = 10;
+ while ($count--) {
+ my(@args) = ($thread) x int(rand(10));
+ print "$thread $count calling printargs @args\n";
+ printargs($thread, @args);
+ }
+}
+
+new Thread (\&start_thread, "A");
+new Thread (\&start_thread, "B");
+#new Thread (\&start_thread, "C");
+#new Thread (\&start_thread, "D");
+#new Thread (\&start_thread, "E");
+#new Thread (\&start_thread, "F");
+
+print "main: exiting\n";
diff --git a/ext/Thread/unsync3.t b/ext/Thread/unsync3.t
new file mode 100644
index 0000000000..e03e9c8af1
--- /dev/null
+++ b/ext/Thread/unsync3.t
@@ -0,0 +1,50 @@
+use Thread;
+
+$| = 1;
+
+srand($$^$^T);
+
+sub whoami {
+ my $thread = shift;
+ print $thread;
+}
+
+sub uppercase {
+ my $count = 100;
+ while ($count--) {
+ my $i = int(rand(1000));
+ 1 while $i--;
+ print "A";
+ $i = int(rand(1000));
+ 1 while $i--;
+ whoami("B");
+ }
+}
+
+sub lowercase {
+ my $count = 100;
+ while ($count--) {
+ my $i = int(rand(1000));
+ 1 while $i--;
+ print "x";
+ $i = int(rand(1000));
+ 1 while $i--;
+ whoami("y");
+ }
+}
+
+sub numbers {
+ my $count = 100;
+ while ($count--) {
+ my $i = int(rand(1000));
+ 1 while $i--;
+ print 1;
+ $i = int(rand(1000));
+ 1 while $i--;
+ whoami(2);
+ }
+}
+
+new Thread \&numbers;
+new Thread \&uppercase;
+new Thread \&lowercase;
diff --git a/ext/Thread/unsync4.t b/ext/Thread/unsync4.t
new file mode 100644
index 0000000000..494ad2be92
--- /dev/null
+++ b/ext/Thread/unsync4.t
@@ -0,0 +1,38 @@
+use Thread;
+
+$| = 1;
+
+srand($$^$^T);
+
+sub printargs {
+ my(@copyargs) = @_;
+ my $thread = shift @copyargs;
+ my $arg;
+ my $i;
+ while ($arg = shift @copyargs) {
+ my $delay = int(rand(500));
+ $i++;
+ print "$thread arg $i is $arg\n";
+ 1 while $delay--;
+ }
+}
+
+sub start_thread {
+ my(@threadargs) = @_;
+ my $thread = $threadargs[0];
+ my $count = 10;
+ while ($count--) {
+ my(@args) = ($thread) x int(rand(10));
+ print "$thread $count calling printargs @args\n";
+ printargs($thread, @args);
+ }
+}
+
+new Thread (\&start_thread, "A");
+new Thread (\&start_thread, "B");
+new Thread (\&start_thread, "C");
+new Thread (\&start_thread, "D");
+new Thread (\&start_thread, "E");
+new Thread (\&start_thread, "F");
+
+print "main: exiting\n";