diff options
author | srinivas%netscape.com <devnull@localhost> | 2000-02-16 16:07:34 +0000 |
---|---|---|
committer | srinivas%netscape.com <devnull@localhost> | 2000-02-16 16:07:34 +0000 |
commit | 6641f90ccfcc81c947b6a4da02502288dc5bad4a (patch) | |
tree | f07eb42e5b9dca073eb875cc516fa97e44747f60 | |
parent | 7fcd92a33126522c53b6b85ccc312de5e39edc7f (diff) | |
download | nspr-hg-6641f90ccfcc81c947b6a4da02502288dc5bad4a.tar.gz |
Modified PR_CancelJob to allow cancellation of IO and timer jobs only.
-rw-r--r-- | pr/include/prtpool.h | 6 | ||||
-rw-r--r-- | pr/src/misc/prtpool.c | 216 | ||||
-rw-r--r-- | pr/tests/thrpool_server.c | 78 |
3 files changed, 183 insertions, 117 deletions
diff --git a/pr/include/prtpool.h b/pr/include/prtpool.h index e35ca4d7..838031fc 100644 --- a/pr/include/prtpool.h +++ b/pr/include/prtpool.h @@ -24,6 +24,12 @@ #include "prio.h" #include "prerror.h" +/* + * NOTE: + * THIS API IS A PRELIMINARY VERSION IN NSPR 4.0 AND IS SUBJECT TO + * CHANGE + */ + PR_BEGIN_EXTERN_C typedef struct PRJobIoDesc { diff --git a/pr/src/misc/prtpool.c b/pr/src/misc/prtpool.c index 7d3b196b..b694b94a 100644 --- a/pr/src/misc/prtpool.c +++ b/pr/src/misc/prtpool.c @@ -87,6 +87,7 @@ struct PRThreadPool { tp_jobq jobq; io_jobq ioq; timer_jobq timerq; + PRLock *join_lock; /* used with jobp->join_cv */ PRCondVar *shutdown_cv; PRBool shutdown; }; @@ -94,10 +95,6 @@ struct PRThreadPool { typedef enum io_op_type { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; -typedef enum _PRJobStatus - { JOB_ON_TIMERQ, JOB_ON_IOQ, JOB_QUEUED, JOB_RUNNING, JOB_COMPLETED, - JOB_CANCELED, JOB_FREED } _PRJobStatus; - #ifdef OPT_WINNT typedef struct NT_notifier { OVERLAPPED overlapped; /* must be first */ @@ -107,12 +104,14 @@ typedef struct NT_notifier { struct PRJob { PRCList links; /* for linking jobs */ - _PRJobStatus status; - PRBool joinable; + PRBool on_ioq; /* job on ioq */ + PRBool on_timerq; /* job on timerq */ PRJobFn job_func; void *job_arg; - PRLock *jlock; PRCondVar *join_cv; + PRBool join_wait; /* == PR_TRUE, when waiting to join */ + PRCondVar *cancel_cv; /* for cancelling IO jobs */ + PRBool cancel_io; /* for cancelling IO jobs */ PRThreadPool *tpool; /* back pointer to thread pool */ PRJobIoDesc *iod; io_op_type io_op; @@ -131,9 +130,28 @@ struct PRJob { #define WTHREAD_LINKS_PTR(_qp) \ ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) +#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) + +#define JOIN_NOTIFY(_jobp) \ + PR_BEGIN_MACRO \ + PR_Lock(_jobp->tpool->join_lock); \ + _jobp->join_wait = PR_FALSE; \ + PR_NotifyCondVar(_jobp->join_cv); \ + PR_Unlock(_jobp->tpool->join_lock); \ + PR_END_MACRO + +#define CANCEL_IO_JOB(jobp) \ + PR_BEGIN_MACRO \ + jobp->cancel_io = PR_FALSE; \ + jobp->on_ioq = PR_FALSE; \ + PR_REMOVE_AND_INIT_LINK(&jobp->links); \ + tp->ioq.cnt--; \ + PR_NotifyCondVar(jobp->cancel_cv); \ + PR_END_MACRO + static void delete_job(PRJob *jobp); static PRThreadPool * alloc_threadpool(); -static PRJob * alloc_job(PRBool joinable); +static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); static void notify_ioq(PRThreadPool *tp); static void notify_timerq(PRThreadPool *tp); @@ -144,9 +162,6 @@ static void notify_timerq(PRThreadPool *tp); * | * V * tp->jobq->lock - * | - * V - * jobp->jlock */ /* @@ -181,9 +196,6 @@ PRCList *head; tp->idle_threads--; tp->jobq.cnt--; PR_Unlock(tp->jobq.lock); - PR_Lock(jobp->jlock); - jobp->status = JOB_RUNNING; - PR_Unlock(jobp->jlock); #else PR_Lock(tp->jobq.lock); @@ -203,20 +215,14 @@ PRCList *head; PR_REMOVE_AND_INIT_LINK(head); tp->jobq.cnt--; jobp = JOB_LINKS_PTR(head); - PR_Lock(jobp->jlock); - jobp->status = JOB_RUNNING; - PR_Unlock(jobp->jlock); PR_Unlock(tp->jobq.lock); #endif jobp->job_func(jobp->job_arg); - if (!jobp->joinable) { + if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { - PR_Lock(jobp->jlock); - jobp->status = JOB_COMPLETED; - PR_NotifyCondVar(jobp->join_cv); - PR_Unlock(jobp->jlock); + JOIN_NOTIFY(jobp); } } PR_Lock(tp->jobq.lock); @@ -245,9 +251,6 @@ add_to_jobq(PRThreadPool *tp, PRJob *jobp) #else PR_Lock(tp->jobq.lock); PR_APPEND_LINK(&jobp->links,&tp->jobq.list); - PR_Lock(jobp->jlock); - jobp->status = JOB_QUEUED; - PR_Unlock(jobp->jlock); tp->jobq.cnt++; if ((tp->idle_threads < tp->jobq.cnt) && (tp->current_threads < tp->max_threads)) { @@ -345,6 +348,10 @@ PRIntervalTime now; PR_Lock(tp->ioq.lock); for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) { jobp = JOB_LINKS_PTR(qp); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + continue; + } if (pollfds_used == (pollfd_cnt)) break; pollfds[pollfds_used].fd = jobp->iod->socket; @@ -409,8 +416,14 @@ PRIntervalTime now; ((events & PR_POLL_WRITE) && (revents & PR_POLL_HUP))) { /* write op & hup */ PR_Lock(tp->ioq.lock); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + PR_Unlock(tp->ioq.lock); + continue; + } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; PR_Unlock(tp->ioq.lock); /* set error */ @@ -430,8 +443,14 @@ PRIntervalTime now; * add to jobq */ PR_Lock(tp->ioq.lock); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + PR_Unlock(tp->ioq.lock); + continue; + } PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; PR_Unlock(tp->ioq.lock); if (jobp->io_op == JOB_IO_CONNECT) { @@ -453,6 +472,10 @@ PRIntervalTime now; PR_Lock(tp->ioq.lock); for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = qp->next) { jobp = JOB_LINKS_PTR(qp); + if (jobp->cancel_io) { + CANCEL_IO_JOB(jobp); + continue; + } if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) break; if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && @@ -460,6 +483,7 @@ PRIntervalTime now; break; PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->ioq.cnt--; + jobp->on_ioq = PR_FALSE; jobp->iod->error = PR_IO_TIMEOUT_ERROR; add_to_jobq(tp, jobp); } @@ -518,6 +542,7 @@ PRIntervalTime now; */ PR_REMOVE_AND_INIT_LINK(&jobp->links); tp->timerq.cnt--; + jobp->on_timerq = PR_FALSE; add_to_jobq(tp, jobp); } PR_Unlock(tp->timerq.lock); @@ -534,6 +559,8 @@ delete_threadpool(PRThreadPool *tp) PR_DestroyCondVar(tp->jobq.cv); if (NULL != tp->jobq.lock) PR_DestroyLock(tp->jobq.lock); + if (NULL != tp->join_lock) + PR_DestroyLock(tp->join_lock); #ifdef OPT_WINNT if (NULL != tp->jobq.nt_completion_port) CloseHandle(tp->jobq.nt_completion_port); @@ -569,6 +596,9 @@ PRThreadPool *tp; tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); if (NULL == tp->jobq.cv) goto failed; + tp->join_lock = PR_NewLock(); + if (NULL == tp->join_lock) + goto failed; #ifdef OPT_WINNT tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); @@ -667,32 +697,29 @@ static void delete_job(PRJob *jobp) { if (NULL != jobp) { - if (NULL != jobp->jlock) { - PR_DestroyLock(jobp->jlock); - jobp->jlock = NULL; - } if (NULL != jobp->join_cv) { PR_DestroyCondVar(jobp->join_cv); jobp->join_cv = NULL; } - jobp->status = JOB_FREED; + if (NULL != jobp->cancel_cv) { + PR_DestroyCondVar(jobp->cancel_cv); + jobp->cancel_cv = NULL; + } PR_DELETE(jobp); } } static PRJob * -alloc_job(PRBool joinable) +alloc_job(PRBool joinable, PRThreadPool *tp) { PRJob *jobp; jobp = PR_NEWZAP(PRJob); if (NULL == jobp) goto failed; - jobp->jlock = PR_NewLock(); - if (NULL == jobp->jlock) - goto failed; if (joinable) { - jobp->join_cv = PR_NewCondVar(jobp->jlock); + jobp->join_cv = PR_NewCondVar(tp->join_lock); + jobp->join_wait = PR_TRUE; if (NULL == jobp->join_cv) goto failed; } else { @@ -714,14 +741,13 @@ PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) { PRJob *jobp; - jobp = alloc_job(joinable); + jobp = alloc_job(joinable, tpool); if (NULL == jobp) return NULL; jobp->job_func = fn; jobp->job_arg = arg; jobp->tpool = tpool; - jobp->joinable = joinable; add_to_jobq(tpool, jobp); return jobp; @@ -735,7 +761,7 @@ queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PRJob *jobp; PRIntervalTime now; - jobp = alloc_job(joinable); + jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } @@ -747,9 +773,7 @@ queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, jobp->job_func = fn; jobp->job_arg = arg; - jobp->status = JOB_ON_IOQ; jobp->tpool = tpool; - jobp->joinable = joinable; jobp->iod = iod; if (JOB_IO_READ == op) { jobp->io_op = JOB_IO_READ; @@ -802,6 +826,7 @@ queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, PR_INSERT_AFTER(&jobp->links,qp); } + jobp->on_ioq = PR_TRUE; tpool->ioq.cnt++; /* * notify io worker thread(s) @@ -878,7 +903,7 @@ PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, */ return(PR_QueueJob(tpool, fn, arg, joinable)); } - jobp = alloc_job(joinable); + jobp = alloc_job(joinable, tpool); if (NULL == jobp) { return NULL; } @@ -890,9 +915,7 @@ PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, jobp->job_func = fn; jobp->job_arg = arg; - jobp->status = JOB_ON_TIMERQ; jobp->tpool = tpool; - jobp->joinable = joinable; jobp->timeout = timeout; now = PR_IntervalNow(); @@ -900,6 +923,7 @@ PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, PR_Lock(tpool->timerq.lock); + jobp->on_timerq = PR_TRUE; if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); else { @@ -958,27 +982,64 @@ PR_CancelJob(PRJob *jobp) { PRStatus rval = PR_FAILURE; PRThreadPool *tp; - if (JOB_QUEUED == jobp->status) { + if (jobp->on_timerq) { /* - * now, check again while holding thread pool lock + * now, check again while holding the timerq lock */ tp = jobp->tpool; - PR_Lock(tp->jobq.lock); - PR_Lock(jobp->jlock); - if (JOB_QUEUED == jobp->status) { + PR_Lock(tp->timerq.lock); + if (jobp->on_timerq) { + jobp->on_timerq = PR_FALSE; PR_REMOVE_AND_INIT_LINK(&jobp->links); - if (!jobp->joinable) { - PR_Unlock(jobp->jlock); + tp->timerq.cnt--; + PR_Unlock(tp->timerq.lock); + if (!JOINABLE_JOB(jobp)) { delete_job(jobp); } else { - jobp->status = JOB_CANCELED; - PR_NotifyCondVar(jobp->join_cv); - PR_Unlock(jobp->jlock); + JOIN_NOTIFY(jobp); } rval = PR_SUCCESS; - } - PR_Unlock(tp->jobq.lock); + } else + PR_Unlock(tp->timerq.lock); + } else if (jobp->on_ioq) { + /* + * now, check again while holding the ioq lock + */ + tp = jobp->tpool; + PR_Lock(tp->ioq.lock); + if (jobp->on_ioq) { + jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); + if (NULL == jobp->cancel_cv) { + PR_Unlock(tp->ioq.lock); + PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); + return PR_FAILURE; + } + /* + * mark job 'cancelled' and notify io thread(s) + * XXXX: + * this assumes there is only one io thread; when there + * are multiple threads, the io thread processing this job + * must be notified. + */ + jobp->cancel_io = PR_TRUE; + PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ + notify_ioq(tp); + PR_Lock(tp->ioq.lock); + while (jobp->cancel_io) + PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); + PR_Unlock(tp->ioq.lock); + PR_ASSERT(!jobp->on_ioq); + if (!JOINABLE_JOB(jobp)) { + delete_job(jobp); + } else { + JOIN_NOTIFY(jobp); + } + rval = PR_SUCCESS; + } else + PR_Unlock(tp->ioq.lock); } + if (PR_FAILURE == rval) + PR_SetError(PR_INVALID_STATE_ERROR, 0); return rval; } @@ -986,19 +1047,14 @@ PR_CancelJob(PRJob *jobp) { PR_IMPLEMENT(PRStatus) PR_JoinJob(PRJob *jobp) { - /* - * No references to the thread pool - */ - if (!jobp->joinable) { + if (!JOINABLE_JOB(jobp)) { PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); return PR_FAILURE; } - PR_Lock(jobp->jlock); - while((JOB_COMPLETED != jobp->status) && - (JOB_CANCELED != jobp->status)) + PR_Lock(jobp->tpool->join_lock); + while(jobp->join_wait) PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); - - PR_Unlock(jobp->jlock); + PR_Unlock(jobp->tpool->join_lock); delete_job(jobp); return PR_SUCCESS; } @@ -1099,7 +1155,7 @@ PRStatus rval_status; } /* - * Delete unjoinable jobs; joinable jobs must be reclaimed by the user + * Delete queued jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { PRJob *jobp; @@ -1108,17 +1164,10 @@ PRStatus rval_status; PR_REMOVE_AND_INIT_LINK(head); jobp = JOB_LINKS_PTR(head); tpool->jobq.cnt--; - - if (!jobp->joinable) { - delete_job(jobp); - } else { - PR_Lock(jobp->jlock); - jobp->status = JOB_CANCELED; - PR_NotifyCondVar(jobp->join_cv); - PR_Unlock(jobp->jlock); - } + delete_job(jobp); } + /* delete io jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { PRJob *jobp; @@ -1126,16 +1175,10 @@ PRStatus rval_status; PR_REMOVE_AND_INIT_LINK(head); tpool->ioq.cnt--; jobp = JOB_LINKS_PTR(head); - if (!jobp->joinable) { - delete_job(jobp); - } else { - PR_Lock(jobp->jlock); - jobp->status = JOB_CANCELED; - PR_NotifyCondVar(jobp->join_cv); - PR_Unlock(jobp->jlock); - } + delete_job(jobp); } + /* delete timer jobs */ while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { PRJob *jobp; @@ -1143,14 +1186,7 @@ PRStatus rval_status; PR_REMOVE_AND_INIT_LINK(head); tpool->timerq.cnt--; jobp = JOB_LINKS_PTR(head); - if (!jobp->joinable) { - delete_job(jobp); - } else { - PR_Lock(jobp->jlock); - jobp->status = JOB_CANCELED; - PR_NotifyCondVar(jobp->join_cv); - PR_Unlock(jobp->jlock); - } + delete_job(jobp); } PR_ASSERT(0 == tpool->jobq.cnt); diff --git a/pr/tests/thrpool_server.c b/pr/tests/thrpool_server.c index 51966acb..56efcce9 100644 --- a/pr/tests/thrpool_server.c +++ b/pr/tests/thrpool_server.c @@ -250,6 +250,27 @@ Serve_Client(void *arg) PR_DELETE(scp); } +static void +print_stats(void *arg) +{ + Server_Param *sp = (Server_Param *) arg; + PRThreadPool *tp = sp->tp; + PRInt32 counter; + PRJob *jobp; + + PR_EnterMonitor(sp->exit_mon); + counter = (*sp->job_counterp); + PR_ExitMonitor(sp->exit_mon); + + printf("PRINT_STATS: #client connections = %d\n",counter); + + + jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), + print_stats, sp, PR_FALSE); + + PR_ASSERT(NULL != jobp); +} + static int job_counter = 0; /* * TCP Server @@ -272,6 +293,7 @@ TCP_Server(void *arg) PRMonitor *sc_mon; PRJob *jobp; int i; + PRStatus rval; /* * Create a tcp socket @@ -317,6 +339,28 @@ TCP_Server(void *arg) "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n", netaddr.inet.ip, netaddr.inet.port)); + sp = PR_NEW(Server_Param); + if (sp == NULL) { + fprintf(stderr,"%s: PR_NEW failed\n", program_name); + failed_already=1; + return; + } + sp->iod.socket = sockfd; + sp->iod.timeout = PR_SecondsToInterval(60); + sp->datalen = tcp_mesg_size; + sp->exit_mon = sc_mon; + sp->job_counterp = &job_counter; + sp->conn_counter = 0; + sp->tp = tp; + sp->netaddr = netaddr; + + /* create and cancel an io job */ + jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, + PR_FALSE); + PR_ASSERT(NULL != jobp); + rval = PR_CancelJob(jobp); + PR_ASSERT(PR_SUCCESS == rval); + /* * create the client process */ @@ -362,12 +406,6 @@ TCP_Server(void *arg) return; } - sp = PR_NEW(Server_Param); - if (sp == NULL) { - fprintf(stderr,"%s: PR_NEW failed\n", program_name); - failed_already=1; - return; - } sp->iod.socket = sockfd; sp->iod.timeout = PR_SecondsToInterval(60); sp->datalen = tcp_mesg_size; @@ -377,6 +415,13 @@ TCP_Server(void *arg) sp->tp = tp; sp->netaddr = netaddr; + /* create and cancel a timer job */ + jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000), + print_stats, sp, PR_FALSE); + PR_ASSERT(NULL != jobp); + rval = PR_CancelJob(jobp); + PR_ASSERT(PR_SUCCESS == rval); + DPRINTF(("TCP_Server: Accepting connections \n")); jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp, @@ -386,27 +431,6 @@ TCP_Server(void *arg) } static void -print_stats(void *arg) -{ - Server_Param *sp = (Server_Param *) arg; - PRThreadPool *tp = sp->tp; - PRInt32 counter; - PRJob *jobp; - - PR_EnterMonitor(sp->exit_mon); - counter = (*sp->job_counterp); - PR_ExitMonitor(sp->exit_mon); - - printf("PRINT_STATS: #client connections = %d\n",counter); - - - jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500), - print_stats, sp, PR_FALSE); - - PR_ASSERT(NULL != jobp); -} - -static void TCP_Server_Accept(void *arg) { Server_Param *sp = (Server_Param *) arg; |