summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsrinivas%netscape.com <devnull@localhost>2000-02-16 16:07:34 +0000
committersrinivas%netscape.com <devnull@localhost>2000-02-16 16:07:34 +0000
commit6641f90ccfcc81c947b6a4da02502288dc5bad4a (patch)
treef07eb42e5b9dca073eb875cc516fa97e44747f60
parent7fcd92a33126522c53b6b85ccc312de5e39edc7f (diff)
downloadnspr-hg-6641f90ccfcc81c947b6a4da02502288dc5bad4a.tar.gz
Modified PR_CancelJob to allow cancellation of IO and timer jobs only.
-rw-r--r--pr/include/prtpool.h6
-rw-r--r--pr/src/misc/prtpool.c216
-rw-r--r--pr/tests/thrpool_server.c78
3 files changed, 183 insertions, 117 deletions
diff --git a/pr/include/prtpool.h b/pr/include/prtpool.h
index e35ca4d7..838031fc 100644
--- a/pr/include/prtpool.h
+++ b/pr/include/prtpool.h
@@ -24,6 +24,12 @@
#include "prio.h"
#include "prerror.h"
+/*
+ * NOTE:
+ * THIS API IS A PRELIMINARY VERSION IN NSPR 4.0 AND IS SUBJECT TO
+ * CHANGE
+ */
+
PR_BEGIN_EXTERN_C
typedef struct PRJobIoDesc {
diff --git a/pr/src/misc/prtpool.c b/pr/src/misc/prtpool.c
index 7d3b196b..b694b94a 100644
--- a/pr/src/misc/prtpool.c
+++ b/pr/src/misc/prtpool.c
@@ -87,6 +87,7 @@ struct PRThreadPool {
tp_jobq jobq;
io_jobq ioq;
timer_jobq timerq;
+ PRLock *join_lock; /* used with jobp->join_cv */
PRCondVar *shutdown_cv;
PRBool shutdown;
};
@@ -94,10 +95,6 @@ struct PRThreadPool {
typedef enum io_op_type
{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
-typedef enum _PRJobStatus
- { JOB_ON_TIMERQ, JOB_ON_IOQ, JOB_QUEUED, JOB_RUNNING, JOB_COMPLETED,
- JOB_CANCELED, JOB_FREED } _PRJobStatus;
-
#ifdef OPT_WINNT
typedef struct NT_notifier {
OVERLAPPED overlapped; /* must be first */
@@ -107,12 +104,14 @@ typedef struct NT_notifier {
struct PRJob {
PRCList links; /* for linking jobs */
- _PRJobStatus status;
- PRBool joinable;
+ PRBool on_ioq; /* job on ioq */
+ PRBool on_timerq; /* job on timerq */
PRJobFn job_func;
void *job_arg;
- PRLock *jlock;
PRCondVar *join_cv;
+ PRBool join_wait; /* == PR_TRUE, when waiting to join */
+ PRCondVar *cancel_cv; /* for cancelling IO jobs */
+ PRBool cancel_io; /* for cancelling IO jobs */
PRThreadPool *tpool; /* back pointer to thread pool */
PRJobIoDesc *iod;
io_op_type io_op;
@@ -131,9 +130,28 @@ struct PRJob {
#define WTHREAD_LINKS_PTR(_qp) \
((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
+#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
+
+#define JOIN_NOTIFY(_jobp) \
+ PR_BEGIN_MACRO \
+ PR_Lock(_jobp->tpool->join_lock); \
+ _jobp->join_wait = PR_FALSE; \
+ PR_NotifyCondVar(_jobp->join_cv); \
+ PR_Unlock(_jobp->tpool->join_lock); \
+ PR_END_MACRO
+
+#define CANCEL_IO_JOB(jobp) \
+ PR_BEGIN_MACRO \
+ jobp->cancel_io = PR_FALSE; \
+ jobp->on_ioq = PR_FALSE; \
+ PR_REMOVE_AND_INIT_LINK(&jobp->links); \
+ tp->ioq.cnt--; \
+ PR_NotifyCondVar(jobp->cancel_cv); \
+ PR_END_MACRO
+
static void delete_job(PRJob *jobp);
static PRThreadPool * alloc_threadpool();
-static PRJob * alloc_job(PRBool joinable);
+static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
static void notify_ioq(PRThreadPool *tp);
static void notify_timerq(PRThreadPool *tp);
@@ -144,9 +162,6 @@ static void notify_timerq(PRThreadPool *tp);
* |
* V
* tp->jobq->lock
- * |
- * V
- * jobp->jlock
*/
/*
@@ -181,9 +196,6 @@ PRCList *head;
tp->idle_threads--;
tp->jobq.cnt--;
PR_Unlock(tp->jobq.lock);
- PR_Lock(jobp->jlock);
- jobp->status = JOB_RUNNING;
- PR_Unlock(jobp->jlock);
#else
PR_Lock(tp->jobq.lock);
@@ -203,20 +215,14 @@ PRCList *head;
PR_REMOVE_AND_INIT_LINK(head);
tp->jobq.cnt--;
jobp = JOB_LINKS_PTR(head);
- PR_Lock(jobp->jlock);
- jobp->status = JOB_RUNNING;
- PR_Unlock(jobp->jlock);
PR_Unlock(tp->jobq.lock);
#endif
jobp->job_func(jobp->job_arg);
- if (!jobp->joinable) {
+ if (!JOINABLE_JOB(jobp)) {
delete_job(jobp);
} else {
- PR_Lock(jobp->jlock);
- jobp->status = JOB_COMPLETED;
- PR_NotifyCondVar(jobp->join_cv);
- PR_Unlock(jobp->jlock);
+ JOIN_NOTIFY(jobp);
}
}
PR_Lock(tp->jobq.lock);
@@ -245,9 +251,6 @@ add_to_jobq(PRThreadPool *tp, PRJob *jobp)
#else
PR_Lock(tp->jobq.lock);
PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
- PR_Lock(jobp->jlock);
- jobp->status = JOB_QUEUED;
- PR_Unlock(jobp->jlock);
tp->jobq.cnt++;
if ((tp->idle_threads < tp->jobq.cnt) &&
(tp->current_threads < tp->max_threads)) {
@@ -345,6 +348,10 @@ PRIntervalTime now;
PR_Lock(tp->ioq.lock);
for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) {
jobp = JOB_LINKS_PTR(qp);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ continue;
+ }
if (pollfds_used == (pollfd_cnt))
break;
pollfds[pollfds_used].fd = jobp->iod->socket;
@@ -409,8 +416,14 @@ PRIntervalTime now;
((events & PR_POLL_WRITE) &&
(revents & PR_POLL_HUP))) { /* write op & hup */
PR_Lock(tp->ioq.lock);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ PR_Unlock(tp->ioq.lock);
+ continue;
+ }
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->ioq.cnt--;
+ jobp->on_ioq = PR_FALSE;
PR_Unlock(tp->ioq.lock);
/* set error */
@@ -430,8 +443,14 @@ PRIntervalTime now;
* add to jobq
*/
PR_Lock(tp->ioq.lock);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ PR_Unlock(tp->ioq.lock);
+ continue;
+ }
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->ioq.cnt--;
+ jobp->on_ioq = PR_FALSE;
PR_Unlock(tp->ioq.lock);
if (jobp->io_op == JOB_IO_CONNECT) {
@@ -453,6 +472,10 @@ PRIntervalTime now;
PR_Lock(tp->ioq.lock);
for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) {
jobp = JOB_LINKS_PTR(qp);
+ if (jobp->cancel_io) {
+ CANCEL_IO_JOB(jobp);
+ continue;
+ }
if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
break;
if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
@@ -460,6 +483,7 @@ PRIntervalTime now;
break;
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->ioq.cnt--;
+ jobp->on_ioq = PR_FALSE;
jobp->iod->error = PR_IO_TIMEOUT_ERROR;
add_to_jobq(tp, jobp);
}
@@ -518,6 +542,7 @@ PRIntervalTime now;
*/
PR_REMOVE_AND_INIT_LINK(&jobp->links);
tp->timerq.cnt--;
+ jobp->on_timerq = PR_FALSE;
add_to_jobq(tp, jobp);
}
PR_Unlock(tp->timerq.lock);
@@ -534,6 +559,8 @@ delete_threadpool(PRThreadPool *tp)
PR_DestroyCondVar(tp->jobq.cv);
if (NULL != tp->jobq.lock)
PR_DestroyLock(tp->jobq.lock);
+ if (NULL != tp->join_lock)
+ PR_DestroyLock(tp->join_lock);
#ifdef OPT_WINNT
if (NULL != tp->jobq.nt_completion_port)
CloseHandle(tp->jobq.nt_completion_port);
@@ -569,6 +596,9 @@ PRThreadPool *tp;
tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
if (NULL == tp->jobq.cv)
goto failed;
+ tp->join_lock = PR_NewLock();
+ if (NULL == tp->join_lock)
+ goto failed;
#ifdef OPT_WINNT
tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
NULL, 0, 0);
@@ -667,32 +697,29 @@ static void
delete_job(PRJob *jobp)
{
if (NULL != jobp) {
- if (NULL != jobp->jlock) {
- PR_DestroyLock(jobp->jlock);
- jobp->jlock = NULL;
- }
if (NULL != jobp->join_cv) {
PR_DestroyCondVar(jobp->join_cv);
jobp->join_cv = NULL;
}
- jobp->status = JOB_FREED;
+ if (NULL != jobp->cancel_cv) {
+ PR_DestroyCondVar(jobp->cancel_cv);
+ jobp->cancel_cv = NULL;
+ }
PR_DELETE(jobp);
}
}
static PRJob *
-alloc_job(PRBool joinable)
+alloc_job(PRBool joinable, PRThreadPool *tp)
{
PRJob *jobp;
jobp = PR_NEWZAP(PRJob);
if (NULL == jobp)
goto failed;
- jobp->jlock = PR_NewLock();
- if (NULL == jobp->jlock)
- goto failed;
if (joinable) {
- jobp->join_cv = PR_NewCondVar(jobp->jlock);
+ jobp->join_cv = PR_NewCondVar(tp->join_lock);
+ jobp->join_wait = PR_TRUE;
if (NULL == jobp->join_cv)
goto failed;
} else {
@@ -714,14 +741,13 @@ PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
{
PRJob *jobp;
- jobp = alloc_job(joinable);
+ jobp = alloc_job(joinable, tpool);
if (NULL == jobp)
return NULL;
jobp->job_func = fn;
jobp->job_arg = arg;
jobp->tpool = tpool;
- jobp->joinable = joinable;
add_to_jobq(tpool, jobp);
return jobp;
@@ -735,7 +761,7 @@ queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
PRJob *jobp;
PRIntervalTime now;
- jobp = alloc_job(joinable);
+ jobp = alloc_job(joinable, tpool);
if (NULL == jobp) {
return NULL;
}
@@ -747,9 +773,7 @@ queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
jobp->job_func = fn;
jobp->job_arg = arg;
- jobp->status = JOB_ON_IOQ;
jobp->tpool = tpool;
- jobp->joinable = joinable;
jobp->iod = iod;
if (JOB_IO_READ == op) {
jobp->io_op = JOB_IO_READ;
@@ -802,6 +826,7 @@ queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
PR_INSERT_AFTER(&jobp->links,qp);
}
+ jobp->on_ioq = PR_TRUE;
tpool->ioq.cnt++;
/*
* notify io worker thread(s)
@@ -878,7 +903,7 @@ PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
*/
return(PR_QueueJob(tpool, fn, arg, joinable));
}
- jobp = alloc_job(joinable);
+ jobp = alloc_job(joinable, tpool);
if (NULL == jobp) {
return NULL;
}
@@ -890,9 +915,7 @@ PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
jobp->job_func = fn;
jobp->job_arg = arg;
- jobp->status = JOB_ON_TIMERQ;
jobp->tpool = tpool;
- jobp->joinable = joinable;
jobp->timeout = timeout;
now = PR_IntervalNow();
@@ -900,6 +923,7 @@ PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
PR_Lock(tpool->timerq.lock);
+ jobp->on_timerq = PR_TRUE;
if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
else {
@@ -958,27 +982,64 @@ PR_CancelJob(PRJob *jobp) {
PRStatus rval = PR_FAILURE;
PRThreadPool *tp;
- if (JOB_QUEUED == jobp->status) {
+ if (jobp->on_timerq) {
/*
- * now, check again while holding thread pool lock
+ * now, check again while holding the timerq lock
*/
tp = jobp->tpool;
- PR_Lock(tp->jobq.lock);
- PR_Lock(jobp->jlock);
- if (JOB_QUEUED == jobp->status) {
+ PR_Lock(tp->timerq.lock);
+ if (jobp->on_timerq) {
+ jobp->on_timerq = PR_FALSE;
PR_REMOVE_AND_INIT_LINK(&jobp->links);
- if (!jobp->joinable) {
- PR_Unlock(jobp->jlock);
+ tp->timerq.cnt--;
+ PR_Unlock(tp->timerq.lock);
+ if (!JOINABLE_JOB(jobp)) {
delete_job(jobp);
} else {
- jobp->status = JOB_CANCELED;
- PR_NotifyCondVar(jobp->join_cv);
- PR_Unlock(jobp->jlock);
+ JOIN_NOTIFY(jobp);
}
rval = PR_SUCCESS;
- }
- PR_Unlock(tp->jobq.lock);
+ } else
+ PR_Unlock(tp->timerq.lock);
+ } else if (jobp->on_ioq) {
+ /*
+ * now, check again while holding the ioq lock
+ */
+ tp = jobp->tpool;
+ PR_Lock(tp->ioq.lock);
+ if (jobp->on_ioq) {
+ jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
+ if (NULL == jobp->cancel_cv) {
+ PR_Unlock(tp->ioq.lock);
+ PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
+ return PR_FAILURE;
+ }
+ /*
+ * mark job 'cancelled' and notify io thread(s)
+ * XXXX:
+ * this assumes there is only one io thread; when there
+ * are multiple threads, the io thread processing this job
+ * must be notified.
+ */
+ jobp->cancel_io = PR_TRUE;
+ PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
+ notify_ioq(tp);
+ PR_Lock(tp->ioq.lock);
+ while (jobp->cancel_io)
+ PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
+ PR_Unlock(tp->ioq.lock);
+ PR_ASSERT(!jobp->on_ioq);
+ if (!JOINABLE_JOB(jobp)) {
+ delete_job(jobp);
+ } else {
+ JOIN_NOTIFY(jobp);
+ }
+ rval = PR_SUCCESS;
+ } else
+ PR_Unlock(tp->ioq.lock);
}
+ if (PR_FAILURE == rval)
+ PR_SetError(PR_INVALID_STATE_ERROR, 0);
return rval;
}
@@ -986,19 +1047,14 @@ PR_CancelJob(PRJob *jobp) {
PR_IMPLEMENT(PRStatus)
PR_JoinJob(PRJob *jobp)
{
- /*
- * No references to the thread pool
- */
- if (!jobp->joinable) {
+ if (!JOINABLE_JOB(jobp)) {
PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
return PR_FAILURE;
}
- PR_Lock(jobp->jlock);
- while((JOB_COMPLETED != jobp->status) &&
- (JOB_CANCELED != jobp->status))
+ PR_Lock(jobp->tpool->join_lock);
+ while(jobp->join_wait)
PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
-
- PR_Unlock(jobp->jlock);
+ PR_Unlock(jobp->tpool->join_lock);
delete_job(jobp);
return PR_SUCCESS;
}
@@ -1099,7 +1155,7 @@ PRStatus rval_status;
}
/*
- * Delete unjoinable jobs; joinable jobs must be reclaimed by the user
+ * Delete queued jobs
*/
while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
PRJob *jobp;
@@ -1108,17 +1164,10 @@ PRStatus rval_status;
PR_REMOVE_AND_INIT_LINK(head);
jobp = JOB_LINKS_PTR(head);
tpool->jobq.cnt--;
-
- if (!jobp->joinable) {
- delete_job(jobp);
- } else {
- PR_Lock(jobp->jlock);
- jobp->status = JOB_CANCELED;
- PR_NotifyCondVar(jobp->join_cv);
- PR_Unlock(jobp->jlock);
- }
+ delete_job(jobp);
}
+ /* delete io jobs */
while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
PRJob *jobp;
@@ -1126,16 +1175,10 @@ PRStatus rval_status;
PR_REMOVE_AND_INIT_LINK(head);
tpool->ioq.cnt--;
jobp = JOB_LINKS_PTR(head);
- if (!jobp->joinable) {
- delete_job(jobp);
- } else {
- PR_Lock(jobp->jlock);
- jobp->status = JOB_CANCELED;
- PR_NotifyCondVar(jobp->join_cv);
- PR_Unlock(jobp->jlock);
- }
+ delete_job(jobp);
}
+ /* delete timer jobs */
while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
PRJob *jobp;
@@ -1143,14 +1186,7 @@ PRStatus rval_status;
PR_REMOVE_AND_INIT_LINK(head);
tpool->timerq.cnt--;
jobp = JOB_LINKS_PTR(head);
- if (!jobp->joinable) {
- delete_job(jobp);
- } else {
- PR_Lock(jobp->jlock);
- jobp->status = JOB_CANCELED;
- PR_NotifyCondVar(jobp->join_cv);
- PR_Unlock(jobp->jlock);
- }
+ delete_job(jobp);
}
PR_ASSERT(0 == tpool->jobq.cnt);
diff --git a/pr/tests/thrpool_server.c b/pr/tests/thrpool_server.c
index 51966acb..56efcce9 100644
--- a/pr/tests/thrpool_server.c
+++ b/pr/tests/thrpool_server.c
@@ -250,6 +250,27 @@ Serve_Client(void *arg)
PR_DELETE(scp);
}
+static void
+print_stats(void *arg)
+{
+ Server_Param *sp = (Server_Param *) arg;
+ PRThreadPool *tp = sp->tp;
+ PRInt32 counter;
+ PRJob *jobp;
+
+ PR_EnterMonitor(sp->exit_mon);
+ counter = (*sp->job_counterp);
+ PR_ExitMonitor(sp->exit_mon);
+
+ printf("PRINT_STATS: #client connections = %d\n",counter);
+
+
+ jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
+ print_stats, sp, PR_FALSE);
+
+ PR_ASSERT(NULL != jobp);
+}
+
static int job_counter = 0;
/*
* TCP Server
@@ -272,6 +293,7 @@ TCP_Server(void *arg)
PRMonitor *sc_mon;
PRJob *jobp;
int i;
+ PRStatus rval;
/*
* Create a tcp socket
@@ -317,6 +339,28 @@ TCP_Server(void *arg)
"TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
netaddr.inet.ip, netaddr.inet.port));
+ sp = PR_NEW(Server_Param);
+ if (sp == NULL) {
+ fprintf(stderr,"%s: PR_NEW failed\n", program_name);
+ failed_already=1;
+ return;
+ }
+ sp->iod.socket = sockfd;
+ sp->iod.timeout = PR_SecondsToInterval(60);
+ sp->datalen = tcp_mesg_size;
+ sp->exit_mon = sc_mon;
+ sp->job_counterp = &job_counter;
+ sp->conn_counter = 0;
+ sp->tp = tp;
+ sp->netaddr = netaddr;
+
+ /* create and cancel an io job */
+ jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
+ PR_FALSE);
+ PR_ASSERT(NULL != jobp);
+ rval = PR_CancelJob(jobp);
+ PR_ASSERT(PR_SUCCESS == rval);
+
/*
* create the client process
*/
@@ -362,12 +406,6 @@ TCP_Server(void *arg)
return;
}
- sp = PR_NEW(Server_Param);
- if (sp == NULL) {
- fprintf(stderr,"%s: PR_NEW failed\n", program_name);
- failed_already=1;
- return;
- }
sp->iod.socket = sockfd;
sp->iod.timeout = PR_SecondsToInterval(60);
sp->datalen = tcp_mesg_size;
@@ -377,6 +415,13 @@ TCP_Server(void *arg)
sp->tp = tp;
sp->netaddr = netaddr;
+ /* create and cancel a timer job */
+ jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
+ print_stats, sp, PR_FALSE);
+ PR_ASSERT(NULL != jobp);
+ rval = PR_CancelJob(jobp);
+ PR_ASSERT(PR_SUCCESS == rval);
+
DPRINTF(("TCP_Server: Accepting connections \n"));
jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
@@ -386,27 +431,6 @@ TCP_Server(void *arg)
}
static void
-print_stats(void *arg)
-{
- Server_Param *sp = (Server_Param *) arg;
- PRThreadPool *tp = sp->tp;
- PRInt32 counter;
- PRJob *jobp;
-
- PR_EnterMonitor(sp->exit_mon);
- counter = (*sp->job_counterp);
- PR_ExitMonitor(sp->exit_mon);
-
- printf("PRINT_STATS: #client connections = %d\n",counter);
-
-
- jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
- print_stats, sp, PR_FALSE);
-
- PR_ASSERT(NULL != jobp);
-}
-
-static void
TCP_Server_Accept(void *arg)
{
Server_Param *sp = (Server_Param *) arg;