diff options
author | wtc%netscape.com <devnull@localhost> | 2000-01-08 15:17:59 +0000 |
---|---|---|
committer | wtc%netscape.com <devnull@localhost> | 2000-01-08 15:17:59 +0000 |
commit | 0467143cda36bafeb97449ad5c383ffda849ed3a (patch) | |
tree | e961fe3f57af1bf8dfc5fd30db615043b68897b2 | |
parent | 8a4a2812d64b542d62e3331a1f16c7b15295428b (diff) | |
download | nspr-hg-0467143cda36bafeb97449ad5c383ffda849ed3a.tar.gz |
Merged the fixes in NSPR 3.5.1 to the main trunk.
Modified files: _win95.h, _winnt.h, primpl.h, prfdcach.c, prfile.c,
ntio.c, w95io.c, ptio.c, ptthread.c
-rw-r--r-- | pr/include/md/_win95.h | 1 | ||||
-rw-r--r-- | pr/include/md/_winnt.h | 1 | ||||
-rw-r--r-- | pr/include/private/primpl.h | 35 | ||||
-rw-r--r-- | pr/src/io/prfdcach.c | 4 | ||||
-rw-r--r-- | pr/src/io/prfile.c | 21 | ||||
-rw-r--r-- | pr/src/md/windows/ntio.c | 10 | ||||
-rw-r--r-- | pr/src/md/windows/w95io.c | 9 | ||||
-rw-r--r-- | pr/src/pthreads/ptio.c | 861 | ||||
-rw-r--r-- | pr/src/pthreads/ptthread.c | 7 |
9 files changed, 173 insertions, 776 deletions
diff --git a/pr/include/md/_win95.h b/pr/include/md/_win95.h index e82ddfe3..6878b152 100644 --- a/pr/include/md/_win95.h +++ b/pr/include/md/_win95.h @@ -245,6 +245,7 @@ extern PRInt32 _MD_CloseSocket(PRInt32 osfd); #define _MD_SOCKET _PR_MD_SOCKET extern PRInt32 _MD_SocketAvailable(PRFileDesc *fd); #define _MD_SOCKETAVAILABLE _MD_SocketAvailable +#define _MD_PIPEAVAILABLE _PR_MD_PIPEAVAILABLE #define _MD_CONNECT _PR_MD_CONNECT extern PRInt32 _MD_Accept(PRFileDesc *fd, PRNetAddr *raddr, PRUint32 *rlen, PRIntervalTime timeout); diff --git a/pr/include/md/_winnt.h b/pr/include/md/_winnt.h index 77db2ddb..85ddd505 100644 --- a/pr/include/md/_winnt.h +++ b/pr/include/md/_winnt.h @@ -247,6 +247,7 @@ extern int _PR_NTFiberSafeSelect(int, fd_set *, fd_set *, fd_set *, const struct timeval *); #define _MD_FSYNC _PR_MD_FSYNC #define _MD_SOCKETAVAILABLE _PR_MD_SOCKETAVAILABLE +#define _MD_PIPEAVAILABLE _PR_MD_PIPEAVAILABLE #define _MD_SET_FD_INHERITABLE _PR_MD_SET_FD_INHERITABLE #define _MD_INIT_ATOMIC() diff --git a/pr/include/private/primpl.h b/pr/include/private/primpl.h index ad468d2c..8b96c424 100644 --- a/pr/include/private/primpl.h +++ b/pr/include/private/primpl.h @@ -186,11 +186,6 @@ struct _PT_Notified typedef struct PTDebug { PRTime timeStarted; - PRUintn predictionsFoiled; - PRUintn pollingListMax; - PRUintn continuationsServed; - PRUintn recyclesNeeded; - PRUintn quiescentIO; PRUintn locks_created, locks_destroyed; PRUintn locks_acquired, locks_released; PRUintn cvars_created, cvars_destroyed; @@ -1421,9 +1416,7 @@ struct PRThread { #if defined(_PR_PTHREADS) pthread_t id; /* pthread identifier for the thread */ PRBool okToDelete; /* ok to delete the PRThread struct? */ - PRCondVar *io_cv; /* a condition used to run i/o */ PRCondVar *waiting; /* where the thread is waiting | NULL */ - PRIntn io_tq_index; /* the io-queue index for this thread */ void *sp; /* recorded sp for garbage collection */ PRThread *next, *prev; /* simple linked list of all threads */ PRUint32 suspend; /* used to store suspend and resume flags */ @@ -1549,36 +1542,8 @@ struct PRFilePrivate { PRFileDesc *next; PRIntn lockCount; _MDFileDesc md; -#ifdef _PR_PTHREADS - PRIntn eventMask[1]; /* An array of _pt_tq_count bitmasks. - * eventMask[i] is only accessed by - * the i-th i/o continuation thread. - * A 0 in a bitmask means the event - * should be igored in the revents - * bitmask returned by poll. - * - * poll's revents bitmask is a short, - * but we need to declare eventMask - * as an array of PRIntn's so that - * each bitmask can be updated - * individually without disturbing - * adjacent memory. Only the lower - * 16 bits of each PRIntn are used. */ -#endif -/* IMPORTANT: eventMask MUST BE THE LAST FIELD OF THIS STRUCTURE */ }; -/* - * The actual size of the PRFilePrivate structure, - * including the eventMask array at the end - */ -#ifdef _PR_PTHREADS -extern PRIntn _pt_tq_count; -#define PRFILEPRIVATE_SIZE (sizeof(PRFilePrivate) + (_pt_tq_count-1) * sizeof(PRIntn)) -#else -#define PRFILEPRIVATE_SIZE sizeof(PRFilePrivate) -#endif - struct PRDir { PRDirEntry d; _MDDir md; diff --git a/pr/src/io/prfdcach.c b/pr/src/io/prfdcach.c index 6ad2e84c..00947d31 100644 --- a/pr/src/io/prfdcach.c +++ b/pr/src/io/prfdcach.c @@ -115,14 +115,14 @@ finished: fd->dtor = NULL; fd->lower = fd->higher = NULL; fd->identity = PR_NSPR_IO_LAYER; - memset(fd->secret, 0, PRFILEPRIVATE_SIZE); + memset(fd->secret, 0, sizeof(PRFilePrivate)); return fd; allocate: fd = PR_NEW(PRFileDesc); if (NULL != fd) { - fd->secret = (PRFilePrivate *) PR_MALLOC(PRFILEPRIVATE_SIZE); + fd->secret = PR_NEW(PRFilePrivate); if (NULL == fd->secret) PR_DELETE(fd); } if (NULL != fd) goto finished; diff --git a/pr/src/io/prfile.c b/pr/src/io/prfile.c index ab16e27a..7bc34e6a 100644 --- a/pr/src/io/prfile.c +++ b/pr/src/io/prfile.c @@ -155,7 +155,6 @@ static PRInt64 PR_CALLBACK FileAvailable64(PRFileDesc *fd) return result; } -#ifndef WIN32 static PRInt32 PR_CALLBACK PipeAvailable(PRFileDesc *fd) { PRInt32 rv; @@ -169,8 +168,16 @@ static PRInt64 PR_CALLBACK PipeAvailable64(PRFileDesc *fd) LL_I2L(rv, _PR_MD_PIPEAVAILABLE(fd)); return rv; } + +static PRStatus PR_CALLBACK PipeSync(PRFileDesc *fd) +{ +#if defined(XP_MAC) +#pragma unused (fd) #endif + return PR_SUCCESS; +} + static PRStatus PR_CALLBACK FileInfo(PRFileDesc *fd, PRFileInfo *info) { PRInt32 rv; @@ -283,14 +290,9 @@ static PRIOMethods _pr_pipeMethods = { FileClose, FileRead, FileWrite, -#ifdef WIN32 - FileAvailable, - FileAvailable64, -#else PipeAvailable, PipeAvailable64, -#endif - FileSync, + PipeSync, (PRSeekFN)_PR_InvalidInt, (PRSeek64FN)_PR_InvalidInt64, (PRFileInfoFN)_PR_InvalidStatus, @@ -322,6 +324,11 @@ static PRIOMethods _pr_pipeMethods = { (PRReservedFN)_PR_InvalidInt }; +PR_IMPLEMENT(const PRIOMethods*) PR_GetPipeMethods(void) +{ + return &_pr_pipeMethods; +} + PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode) { PRInt32 osfd; diff --git a/pr/src/md/windows/ntio.c b/pr/src/md/windows/ntio.c index 201bee2f..1b60a380 100644 --- a/pr/src/md/windows/ntio.c +++ b/pr/src/md/windows/ntio.c @@ -2475,6 +2475,16 @@ _PR_MD_SOCKETAVAILABLE(PRFileDesc *fd) return result; } +PRInt32 +_PR_MD_PIPEAVAILABLE(PRFileDesc *fd) +{ + if (NULL == fd) + PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0); + else + PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); + return -1; +} + PROffset32 _PR_MD_LSEEK(PRFileDesc *fd, PROffset32 offset, int whence) { diff --git a/pr/src/md/windows/w95io.c b/pr/src/md/windows/w95io.c index c2553fc7..6e969a98 100644 --- a/pr/src/md/windows/w95io.c +++ b/pr/src/md/windows/w95io.c @@ -923,3 +923,12 @@ _PR_MD_UNLOCKFILE(PRInt32 f) } } /* end _PR_MD_UNLOCKFILE() */ +PRInt32 +_PR_MD_PIPEAVAILABLE(PRFileDesc *fd) +{ + if (NULL == fd) + PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0); + else + PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); + return -1; +} diff --git a/pr/src/pthreads/ptio.c b/pr/src/pthreads/ptio.c index accca747..f1e6a255 100644 --- a/pr/src/pthreads/ptio.c +++ b/pr/src/pthreads/ptio.c @@ -190,7 +190,7 @@ static PRBool IsValidNetAddrLen(const PRNetAddr *addr, PRInt32 addr_len) * The polling interval defines the maximum amount of time that a thread * might hang up before an interrupt is noticed. */ -#define PT_DEFAULT_POLL_MSEC 100 +#define PT_DEFAULT_POLL_MSEC 5000 /* * pt_SockLen is the type for the length of a socket address @@ -217,17 +217,11 @@ typedef PRBool (*ContinuationFn)(pt_Continuation *op, PRInt16 revents); typedef enum pr_ContuationStatus { pt_continuation_pending, - pt_continuation_recycle, - pt_continuation_abort, pt_continuation_done } pr_ContuationStatus; struct pt_Continuation { - /* These objects are linked in ascending timeout order */ - pt_Continuation *next, *prev; /* self linked list of these things */ - - PRFileDesc *fd; /* The building of the continuation operation */ ContinuationFn function; /* what function to continue */ union { PRIntn osfd; } arg1; /* #1 - the op's fd */ @@ -258,7 +252,6 @@ struct pt_Continuation #endif /* HPUX11 */ PRIntervalTime timeout; /* client (relative) timeout */ - PRIntervalTime absolute; /* internal (absolute) timeout */ PRInt16 event; /* flags for poll()'s events */ @@ -271,27 +264,8 @@ struct pt_Continuation PRIntn syserrno; /* in case it failed, why (errno) */ pr_ContuationStatus status; /* the status of the operation */ - PRCondVar *complete; /* to notify the initiating thread */ - PRIntn io_tq_index; /* io-queue index */ }; -static struct pt_TimedQueue -{ - PRLock *ml; /* a little protection */ - PRThread *thread; /* internal thread's identification */ - PRUintn op_count; /* number of operations in the list */ - pt_Continuation *head, *tail; /* head/tail of list of operations */ - - pt_Continuation *op; /* timed operation furthest in future */ - struct pollfd *pollingList; /* list built for polling */ - PRIntn pollingSlotsAllocated; /* # entries available in list */ - pt_Continuation **pollingOps; /* list paralleling polling list */ -} *pt_tqp; /* an array */ - -static PRIntn _pt_num_cpus; -PRIntn _pt_tq_count; /* size of the pt_tqp array */ -static PRInt32 _pt_tq_index; /* atomically incremented */ - #if defined(DEBUG) PTDebug pt_debug; /* this is shared between several modules */ @@ -316,16 +290,6 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg) PR_fprintf( debug_out, "\tstarted: %s[%lld]\n", buffer, elapsed); PR_fprintf( - debug_out, "\tmissed predictions: %u\n", stats.predictionsFoiled); - PR_fprintf( - debug_out, "\tpollingList max: %u\n", stats.pollingListMax); - PR_fprintf( - debug_out, "\tcontinuations served: %u\n", stats.continuationsServed); - PR_fprintf( - debug_out, "\trecycles needed: %u\n", stats.recyclesNeeded); - PR_fprintf( - debug_out, "\tquiescent IO: %u\n", stats.quiescentIO); - PR_fprintf( debug_out, "\tlocks [created: %u, destroyed: %u]\n", stats.locks_created, stats.locks_destroyed); PR_fprintf( @@ -341,547 +305,135 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg) #endif /* DEBUG */ -/* - * The following two functions, pt_InsertTimedInternal and - * pt_FinishTimedInternal, are always called with the tqp->ml - * lock held. The "internal" in the functions' names come from - * the Mesa programming language. Internal functions are always - * called from inside a monitor. - */ - -static void pt_InsertTimedInternal(pt_Continuation *op) +static void pt_poll_now(pt_Continuation *op) { - pt_Continuation *t_op = NULL; - PRIntervalTime now = PR_IntervalNow(); - struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index]; - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PRThread *self = PR_GetCurrentThread(); - - PR_ASSERT(tqp == &pt_tqp[self->io_tq_index]); - PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL)); - PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0)); - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->head; tmp != NULL; tmp = tmp->next) - PR_ASSERT(tmp->io_tq_index == op->io_tq_index); - } -#endif /* defined(DEBUG) */ - - /* - * If this element operation isn't timed, it gets queued at the - * end of the list (just after tqp->tail) and we're - * finishd early. - */ - if (PR_INTERVAL_NO_TIMEOUT == op->timeout) - { - t_op = tqp->tail; /* put it at the end */ - goto done; - } - - /* - * The portion of this routine deals with timed ops. - */ - op->absolute = now + op->timeout; /* absolute ticks */ - if (NULL == tqp->op) tqp->op = op; - else - { - /* - * To find where in the list to put the new operation, based - * on the absolute time the operation in question will expire. - * - * The new operation ('op') will expire at now() + op->timeout. - * - * This should be easy! - */ - - for (t_op = tqp->op; NULL != t_op; t_op = t_op->prev) - { - /* - * If 'op' expires later than t_op, then insert 'op' just - * ahead of t_op. Otherwise, compute when operation[n-1] - * expires and try again. - * - * The actual difference between the expiriation of 'op' - * and the current operation what becomes the new operaton's - * timeout interval. That interval is also subtracted from - * the interval of the operation immediately following where - * we stick 'op' (unless the next one isn't timed). The new - * timeout assigned to 'op' takes into account the values of - * now() and when the previous intervals were computed. - */ - if ((PRInt32)(op->absolute - t_op->absolute) >= 0) - { - if (t_op == tqp->op) tqp->op = op; - break; - } - } - } - -done: - - /* - * Insert 'op' into the queue just after t_op or if t_op is null, - * at the head of the list. - * - * We need to set up the 'next' and 'prev' pointers of 'op' - * correctly before inserting 'op' into the queue. Also, we insert - * 'op' by updating tqp->head or op->prev->next first, and then - * updating op->next->prev. We want to make sure that the 'next' - * pointers are linked up correctly at all times so that we can - * traverse the queue by starting with tqp->head and following - * the 'next' pointers, without having to acquire the tqp->ml lock. - * (we do that in pt_ContinuationThreadInternal). We traverse the 'prev' - * pointers only in this function, which is called with the lock held. - * - * Similar care is taken in pt_FinishTimedInternal where we remove - * an op from the queue. - */ - if (NULL == t_op) - { - op->prev = NULL; - op->next = tqp->head; - tqp->head = op; - if (NULL == tqp->tail) tqp->tail = op; - else op->next->prev = op; - } - else - { - op->prev = t_op; - op->next = t_op->next; - if (NULL != op->prev) - op->prev->next = op; - if (NULL != op->next) - op->next->prev = op; - if (t_op == tqp->tail) - tqp->tail = op; - } - - tqp->op_count += 1; - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PR_ASSERT(tqp->head != NULL); - PR_ASSERT(tqp->tail != NULL); - PR_ASSERT(tqp->op_count != 0); - PR_ASSERT(tqp->head->prev == NULL); - PR_ASSERT(tqp->tail->next == NULL); - if (tqp->op_count > 1) - { - PR_ASSERT(tqp->head->next != NULL); - PR_ASSERT(tqp->tail->prev != NULL); - } - else - { - PR_ASSERT(tqp->head->next == NULL); - PR_ASSERT(tqp->tail->prev == NULL); - } - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - } -#endif /* defined(DEBUG) */ - -} /* pt_InsertTimedInternal */ - -/* - * function: pt_FinishTimedInternal - * - * Takes the finished operation out of the timed queue. It - * notifies the initiating thread that the opertions is - * complete and returns to the caller the value of the next - * operation in the list (or NULL). - */ -static pt_Continuation *pt_FinishTimedInternal(pt_Continuation *op) -{ - pt_Continuation *next; - struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index]; - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PR_ASSERT(tqp->head != NULL); - PR_ASSERT(tqp->tail != NULL); - PR_ASSERT(tqp->op_count != 0); - PR_ASSERT(tqp->head->prev == NULL); - PR_ASSERT(tqp->tail->next == NULL); - if (tqp->op_count > 1) - { - PR_ASSERT(tqp->head->next != NULL); - PR_ASSERT(tqp->tail->prev != NULL); - } - else - { - PR_ASSERT(tqp->head->next == NULL); - PR_ASSERT(tqp->tail->prev == NULL); - } - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - } -#endif /* defined(DEBUG) */ - - /* remove this one from the list */ - if (NULL == op->prev) tqp->head = op->next; - else op->prev->next = op->next; - if (NULL == op->next) tqp->tail = op->prev; - else op->next->prev = op->prev; - - /* did we happen to hit the timed op? */ - if (op == tqp->op) tqp->op = op->prev; - - next = op->next; - op->next = op->prev = NULL; - op->status = pt_continuation_done; - - tqp->op_count -= 1; - -#if defined(DEBUG) - pt_debug.continuationsServed += 1; -#endif - PR_NotifyCondVar(op->complete); - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL)); - PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0)); - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - } -#endif /* defined(DEBUG) */ - - return next; -} /* pt_FinishTimedInternal */ - -static void pt_ContinuationThreadInternal(pt_Continuation *my_op) -{ - /* initialization */ - PRInt32 msecs, mx_poll_ticks; - PRThreadPriority priority; /* used to save caller's prio */ - PRIntn pollingListUsed; /* # entries used in the list */ - PRIntn pollingListNeeded; /* # entries needed this time */ - PRIntn io_tq_index = my_op->io_tq_index; - struct pt_TimedQueue *tqp = &pt_tqp[my_op->io_tq_index]; - struct pollfd *pollingList = tqp->pollingList; - PRIntn pollingSlotsAllocated = tqp->pollingSlotsAllocated; - pt_Continuation **pollingOps = tqp->pollingOps; - - PR_Unlock(tqp->ml); /* don't need that silly lock for a bit */ - - priority = PR_GetThreadPriority(tqp->thread); - PR_SetThreadPriority(tqp->thread, PR_PRIORITY_HIGH); - - mx_poll_ticks = (PRInt32)PR_MillisecondsToInterval(PT_DEFAULT_POLL_MSEC); - - /* do some real work */ - while (PR_TRUE) - { - PRIntn rv; - PRInt32 timeout; - PRIntn pollIndex; - PRIntervalTime now; - pt_Continuation *op, *next_op; - - PR_ASSERT(NULL != tqp->head); - - pollingListNeeded = tqp->op_count; - - /* - * We are not holding the tqp->ml lock now, so more items may - * get added to pt_tq during this window of time. We hope - * that 10 more spaces in the polling list should be enough. - * - * The space allocated is for both a vector that parallels the - * polling list, providing pointers directly into the operation's - * table and the polling list itself. There is a guard element - * between the two structures. - */ - pollingListNeeded += 10; - if (pollingListNeeded > pollingSlotsAllocated) - { - if (NULL != pollingOps) PR_Free(pollingOps); - pollingOps = (pt_Continuation**)PR_Malloc( - sizeof(pt_Continuation**) + pollingListNeeded * - (sizeof(struct pollfd) + sizeof(pt_Continuation*))); - PR_ASSERT(NULL != pollingOps); - tqp->pollingOps = pollingOps; - pollingSlotsAllocated = pollingListNeeded; - tqp->pollingSlotsAllocated = pollingSlotsAllocated; - pollingOps[pollingSlotsAllocated] = (pt_Continuation*)-1; - pollingList = (struct pollfd*)(&pollingOps[pollingSlotsAllocated + 1]); - tqp->pollingList = pollingList; - - } - -#if defined(DEBUG) - if (pollingListNeeded > pt_debug.pollingListMax) - pt_debug.pollingListMax = pollingListNeeded; -#endif - - /* - ** This is interrupt processing. If this thread was interrupted, - ** the thread state will have the PT_THREAD_ABORTED bit set. This - ** overrides good completions as well as timeouts. - ** - ** BTW, it does no good to hold the lock here. This lock doesn't - ** protect the thread structure in any way. Testing the bit and - ** (perhaps) resetting it are safe 'cause it's the only modifiable - ** bit in that word. - */ - if (_PT_THREAD_INTERRUPTED(tqp->thread)) - { - my_op->status = pt_continuation_abort; - tqp->thread->state &= ~PT_THREAD_ABORTED; - } - - - /* - * Build up a polling list. - * This list is sorted on time. Operations that have been - * interrupted are completed and not included in the list. - * There is an assertion that the operation is in progress. - */ - pollingListUsed = 0; - PR_Lock(tqp->ml); - - for (op = tqp->head; NULL != op;) - { - if (pt_continuation_abort == op->status) - { - op->result.code = -1; - op->syserrno = EINTR; - next_op = pt_FinishTimedInternal(op); - if (op == my_op) goto recycle; - else op = next_op; - PR_ASSERT(NULL != tqp->head); - } - else - { - op->status = pt_continuation_pending; - if (pollingListUsed >= pollingSlotsAllocated) - { -#if defined(DEBUG) - pt_debug.predictionsFoiled += 1; -#endif - break; - } - PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]); - /* - * eventMask bitmasks are declared as PRIntn so that - * each bitmask can be updated individually without - * disturbing adjacent memory, but only the lower 16 - * bits of a bitmask are used. - */ - op->fd->secret->eventMask[io_tq_index] = 0xffff; - pollingOps[pollingListUsed] = op; - pollingList[pollingListUsed].revents = 0; - pollingList[pollingListUsed].fd = op->arg1.osfd; - pollingList[pollingListUsed].events = op->event; - pollingListUsed += 1; - op = op->next; - } - } - - /* - * We don't want to wait forever on this poll. So keep - * the interval down. The operations, if they are timed, - * still have to timeout, while those that are not timed - * should persist forever. But they may be aborted. That's - * what this anxiety is all about. - */ - if (PR_INTERVAL_NO_TIMEOUT == tqp->head->timeout) - msecs = PT_DEFAULT_POLL_MSEC; - else - { - timeout = tqp->head->absolute - PR_IntervalNow(); - if (timeout <= 0) msecs = 0; /* already timed out */ - else if (timeout >= mx_poll_ticks) msecs = PT_DEFAULT_POLL_MSEC; - else msecs = (PRInt32)PR_IntervalToMilliseconds(timeout); - } - - PR_Unlock(tqp->ml); - - /* - * If 'op' isn't NULL at this point, then we didn't get to - * the end of the list. That means that more items got added - * to the list than we anticipated. So, forget this iteration, - * go around the horn again. - * - * One would hope this doesn't happen all that often. - */ - if (NULL != op) continue; /* make it rethink things */ - - PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]); - - rv = poll(pollingList, pollingListUsed, msecs); - - if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) - continue; /* go around the loop again */ - - if (rv > 0) - { - /* - * poll() says that something in our list is ready for some more - * action. Find it, load up the operation and see what happens. - */ - - /* - * This may work out okay. The rule is that only this thread, - * the continuation thread, can remove elements from the list. - * Therefore, the list is at worst, longer than when we built - * the polling list. - */ - - for (pollIndex = 0; pollIndex < pollingListUsed; ++pollIndex) - { - PRInt16 events = pollingList[pollIndex].events; - PRInt16 revents = pollingList[pollIndex].revents; - - op = pollingOps[pollIndex]; /* this is the operation */ - - /* (ref: Bug #153459) - ** In case of POLLERR we let the operation retry in hope - ** of getting a more definitive OS error. - */ - if ((revents & POLLNVAL) /* busted in all cases */ - || ((events & POLLOUT) && (revents & POLLHUP))) /* write op & hup */ - { - PR_Lock(tqp->ml); - op->result.code = -1; - if (POLLNVAL & revents) op->syserrno = EBADF; - else if (POLLHUP & revents) op->syserrno = EPIPE; - (void)pt_FinishTimedInternal(op); - if (op == my_op) goto recycle; - PR_Unlock(tqp->ml); - } - else if ((0 != (revents & op->fd->secret->eventMask[io_tq_index])) - && (pt_continuation_pending == op->status)) - { - /* - * Only good?(?) revents left. Operations not pending - * will be pruned next time we build a list. This operation - * will be pruned if the continuation indicates it is - * finished. - */ - - if (op->function(op, revents)) - { - PR_Lock(tqp->ml); - (void)pt_FinishTimedInternal(op); - if (op == my_op) goto recycle; - PR_Unlock(tqp->ml); - } - else - { - /* - * If the continuation function returns - * PR_FALSE, it means available data have - * been read, output buffer space has been - * filled, or pending connections have been - * accepted by prior calls. If the - * continuation function is immediately - * invoked again, it will most likely - * return PR_FALSE. So turn off these - * events in the event mask for this fd so - * that if this fd is encountered again in - * the polling list with these events on, - * we won't invoke the continuation - * function again. - */ - op->fd->secret->eventMask[io_tq_index] &= ~revents; - } - } - } - } - - /* - * This is timeout processing. It is done after checking - * for good completions. Those that just made it under the - * wire are lucky, but none the less, valid. - */ - now = PR_IntervalNow(); - PR_Lock(tqp->ml); - while ((NULL != tqp->head) - && (PR_INTERVAL_NO_TIMEOUT != tqp->head->timeout)) - { - op = tqp->head; /* get a copy of this before finishing it */ - if ((PRInt32)(op->absolute - now) > 0) break; - /* - * The head element of the timed queue has timed out. Record - * the reason for completion and take it out of the list. - */ - op->result.code = -1; - op->syserrno = ETIMEDOUT; - (void)pt_FinishTimedInternal(op); - - /* - * If it's 'my_op' then we have to switch threads. Exit w/o - * finishing the scan. The scan will be completed when another - * thread calls in acting as the continuation thread. - */ - if (op == my_op) goto recycle; /* exit w/o unlocking */ - } - PR_Unlock(tqp->ml); - } - - PR_NOT_REACHED("This is a while(true) loop /w no breaks"); - -recycle: - /* - ** Recycling the continuation thread. - ** - ** The thread we were using for I/O continuations just completed - ** the I/O it submitted. It has to return to it's caller. We need - ** another thread to act in the continuation role. We can do that - ** by taking any operation off the timed queue, setting its state - ** to 'recycle' and notifying the condition. - ** - ** Selecting a likely thread seems like magic. I'm going to try - ** using one that has the longest (or no) timeout, tqp->tail. - ** If that slot's empty, then there's no outstanding I/O and we - ** don't need a thread at all. - ** - ** BTW, we're locked right now, and we'll be returning with the - ** the lock held as well. Seems odd, doesn't it? - */ - - /* $$$ should this be called with the lock held? $$$ */ - PR_SetThreadPriority(tqp->thread, priority); /* reset back to caller's */ - - PR_ASSERT((NULL == tqp->head) == (0 == tqp->op_count)); - PR_ASSERT((NULL == tqp->head) == (NULL == tqp->tail)); - PR_ASSERT(pt_continuation_done == my_op->status); + PRInt32 msecs; + PRIntervalTime epoch, now, elapsed, remaining; + PRThread *self = PR_GetCurrentThread(); - if (NULL != tqp->tail) - { - if (tqp->tail->status != pt_continuation_abort) - { - tqp->tail->status = pt_continuation_recycle; - } - PR_NotifyCondVar(tqp->tail->complete); -#if defined(DEBUG) - pt_debug.recyclesNeeded += 1; -#endif - } -#if defined(DEBUG) - else pt_debug.quiescentIO += 1; -#endif - -} /* pt_ContinuationThreadInternal */ + PR_ASSERT(PR_INTERVAL_NO_WAIT != op->timeout); + + switch (op->timeout) { + case PR_INTERVAL_NO_TIMEOUT: + msecs = PT_DEFAULT_POLL_MSEC; + do + { + PRIntn rv; + struct pollfd tmp_pfd; + + tmp_pfd.revents = 0; + tmp_pfd.fd = op->arg1.osfd; + tmp_pfd.events = op->event; + + rv = poll(&tmp_pfd, 1, msecs); + + if (self->state & PT_THREAD_ABORTED) + { + self->state &= ~PT_THREAD_ABORTED; + op->result.code = -1; + op->syserrno = EINTR; + op->status = pt_continuation_done; + return; + } + + if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) + continue; /* go around the loop again */ + + if (rv > 0) + { + PRIntn fd = tmp_pfd.fd; + PRInt16 events = tmp_pfd.events; + PRInt16 revents = tmp_pfd.revents; + + if ((revents & POLLNVAL) /* busted in all cases */ + || ((events & POLLOUT) && (revents & POLLHUP))) + /* write op & hup */ + { + op->result.code = -1; + if (POLLNVAL & revents) op->syserrno = EBADF; + else if (POLLHUP & revents) op->syserrno = EPIPE; + op->status = pt_continuation_done; + } else { + if (op->function(op, revents)) + op->status = pt_continuation_done; + } + } else if (rv == -1) { + op->result.code = -1; + op->syserrno = errno; + op->status = pt_continuation_done; + } + /* else, poll timed out */ + } while (pt_continuation_done != op->status); + break; + default: + now = epoch = PR_IntervalNow(); + remaining = op->timeout; + do + { + PRIntn rv; + struct pollfd tmp_pfd; + + tmp_pfd.revents = 0; + tmp_pfd.fd = op->arg1.osfd; + tmp_pfd.events = op->event; + + msecs = (PRInt32)PR_IntervalToMilliseconds(remaining); + if (msecs > PT_DEFAULT_POLL_MSEC) + msecs = PT_DEFAULT_POLL_MSEC; + rv = poll(&tmp_pfd, 1, msecs); + + if (self->state & PT_THREAD_ABORTED) + { + self->state &= ~PT_THREAD_ABORTED; + op->result.code = -1; + op->syserrno = EINTR; + op->status = pt_continuation_done; + return; + } + + if (rv > 0) + { + PRIntn fd = tmp_pfd.fd; + PRInt16 events = tmp_pfd.events; + PRInt16 revents = tmp_pfd.revents; + + if ((revents & POLLNVAL) /* busted in all cases */ + || ((events & POLLOUT) && (revents & POLLHUP))) + /* write op & hup */ + { + op->result.code = -1; + if (POLLNVAL & revents) op->syserrno = EBADF; + else if (POLLHUP & revents) op->syserrno = EPIPE; + op->status = pt_continuation_done; + } else { + if (op->function(op, revents)) + { + op->status = pt_continuation_done; + } + } + } else if ((rv == 0) || + ((errno == EINTR) || (errno == EAGAIN))) { + if (rv == 0) /* poll timed out */ + now += PR_MillisecondsToInterval(msecs); + else + now = PR_IntervalNow(); + elapsed = (PRIntervalTime) (now - epoch); + if (elapsed >= op->timeout) { + op->result.code = -1; + op->syserrno = ETIMEDOUT; + op->status = pt_continuation_done; + } else + remaining = op->timeout - elapsed; + } else { + op->result.code = -1; + op->syserrno = errno; + op->status = pt_continuation_done; + } + } while (pt_continuation_done != op->status); + break; + } + +} /* pt_poll_now */ static PRIntn pt_Continue(pt_Continuation *op) { @@ -889,99 +441,13 @@ static PRIntn pt_Continue(pt_Continuation *op) PRThread *self = PR_GetCurrentThread(); struct pt_TimedQueue *tqp; - /* lazy assignment of the thread's ioq */ - if (-1 == self->io_tq_index) - { - self->io_tq_index = (PR_AtomicIncrement(&_pt_tq_index)-1) % _pt_tq_count; - } - - PR_ASSERT(self->io_tq_index >= 0); - tqp = &pt_tqp[self->io_tq_index]; - - /* lazy allocation of the thread's cv */ - if (NULL == self->io_cv) - self->io_cv = PR_NewCondVar(tqp->ml); - /* Finish filling in the blank slots */ - op->complete = self->io_cv; op->status = pt_continuation_pending; /* set default value */ - op->io_tq_index = self->io_tq_index; - PR_Lock(tqp->ml); /* we provide the locking */ - - pt_InsertTimedInternal(op); /* insert in the structure */ - - /* - ** At this point, we try to decide whether there is a continuation - ** thread, or whether we should assign this one to serve in that role. - */ - do - { - if (NULL == tqp->thread) - { - /* - ** We're the one. Call the processing function with the lock - ** held. It will return with it held as well, though there - ** will certainly be times within the function when it gets - ** released. - */ - tqp->thread = self; /* I'm taking control */ - pt_ContinuationThreadInternal(op); /* go slash and burn */ - PR_ASSERT(pt_continuation_done == op->status); - tqp->thread = NULL; /* I'm abdicating my rule */ - } - else - { - rv = PR_WaitCondVar(op->complete, PR_INTERVAL_NO_TIMEOUT); - /* - * If we get interrupted, we set state the continuation thread will - * see and allow it to finish the I/O operation w/ error. That way - * the rule that only the continuation thread is removing elements - * from the list is still valid. - * - * Don't call interrupt on the continuation thread. That'll just - * irritate him. He's cycling around at least every mx_poll_ticks - * anyhow and should notice the request in there. When he does - * notice, this operation will be finished and the op's status - * marked as pt_continuation_done. - */ - if ((PR_FAILURE == rv) /* the wait was interrupted */ - && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) - { - if (pt_continuation_done == op->status) - { - /* - * The op is done and has been removed - * from the timed queue. We must not - * change op->status, otherwise this - * thread will go around the loop again. - * - * It's harsh to mark the op failed with - * interrupt error when the io is already - * done, but we should indicate the fact - * that the thread was interrupted. So - * we set the aborted flag to abort the - * thread's next blocking call. Is this - * the right thing to do? - */ - self->state |= PT_THREAD_ABORTED; - } - else - { - /* go around the loop again */ - op->status = pt_continuation_abort; - } - } - /* - * If we're to recycle, continue within this loop. This will - * cause this thread to become the continuation thread. - */ - - } - } while (pt_continuation_done != op->status); - - - PR_Unlock(tqp->ml); /* we provided the locking */ - - return op->result.code; /* and the primary answer */ + /* + * let each thread call poll directly + */ + pt_poll_now(op); + PR_ASSERT(pt_continuation_done == op->status); + return op->result.code; } /* pt_Continue */ /*****************************************************************************/ @@ -1290,52 +756,8 @@ static PRBool pt_hpux_sendfile_cont(pt_Continuation *op, PRInt16 revents) } #endif /* HPUX11 */ -#define _MD_CPUS_ONLINE 2 - void _PR_InitIO() { - PRIntn index; - char *num_io_queues; - - if (NULL != (num_io_queues = getenv("NSPR_NUM_IO_QUEUES"))) - { - _pt_tq_count = atoi(num_io_queues); - } - else - { - /* - * Get the number of CPUs if the pthread - * library has kernel-scheduled entities that - * can run on multiple CPUs. - */ -#ifdef HPUX11 - _pt_num_cpus = pthread_num_processors_np(); -#elif defined(IRIX) || defined(OSF1) - _pt_num_cpus = sysconf(_SC_NPROC_ONLN); -#elif defined(AIX) || defined(LINUX) || defined(SOLARIS) - _pt_num_cpus = sysconf(_SC_NPROCESSORS_ONLN); -#else - /* - * A pure user-level (Mx1) pthread library can - * only use one CPU, even on a multiprocessor. - */ - _pt_num_cpus = 1; -#endif - if (_pt_num_cpus < 0) - _pt_num_cpus = _MD_CPUS_ONLINE; - _pt_tq_count = _pt_num_cpus; - } - - pt_tqp = (struct pt_TimedQueue *) - PR_CALLOC(_pt_tq_count * sizeof(struct pt_TimedQueue)); - PR_ASSERT(NULL != pt_tqp); - - for (index = 0; index < _pt_tq_count; index++) - { - pt_tqp[index].ml = PR_NewLock(); - PR_ASSERT(NULL != pt_tqp[index].ml); - } - #if defined(DEBUG) memset(&pt_debug, 0, sizeof(PTDebug)); pt_debug.timeStarted = PR_Now(); @@ -1438,7 +860,6 @@ static PRInt32 pt_Read(PRFileDesc *fd, void *buf, PRInt32 amount) && (!fd->secret->nonblocking)) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -1479,7 +900,6 @@ static PRInt32 pt_Write(PRFileDesc *fd, const void *buf, PRInt32 amount) if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -1573,7 +993,6 @@ static PRInt32 pt_Writev( { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)osiov; op.arg3.amount = osiov_len; @@ -1713,7 +1132,6 @@ static PRStatus pt_Connect( else { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; #ifdef _PR_HAVE_SOCKADDR_LEN op.arg2.buffer = (void*)&addrCopy; @@ -1788,7 +1206,6 @@ static PRFileDesc* pt_Accept( else { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = addr; op.arg3.addr_len = &addr_len; @@ -1919,7 +1336,6 @@ static PRInt32 pt_Recv( else { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -2004,7 +1420,6 @@ static PRInt32 pt_Send( if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -2064,7 +1479,6 @@ static PRInt32 pt_SendTo( if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -2110,7 +1524,6 @@ static PRInt32 pt_RecvFrom(PRFileDesc *fd, void *buf, PRInt32 amount, if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -2238,7 +1651,6 @@ static PRInt32 pt_AIXSendFile(PRFileDesc *sd, PRSendFileData *sfd, if ((rv == 1) || ((rv == -1) && (count == 0))) { pt_Continuation op; - op.fd = sd; op.arg1.osfd = sd->secret->md.osfd; op.arg2.buffer = &sf_struct; op.arg4.flags = send_flags; @@ -2357,7 +1769,6 @@ static PRInt32 pt_HPUXSendFile(PRFileDesc *sd, PRSendFileData *sfd, hdtrl[1].iov_len = sfd->tlen - trailer_nbytes_sent; } - op.fd = sd; op.arg1.osfd = sd->secret->md.osfd; op.filedesc = sfd->fd->secret->md.osfd; op.arg2.buffer = hdtrl; @@ -2915,7 +2326,7 @@ static PRIOMethods _pr_pipe_methods = { pt_Write, pt_Available_s, pt_Available64_s, - pt_Fsync, + pt_Synch, (PRSeekFN)_PR_InvalidInt, (PRSeek64FN)_PR_InvalidInt64, (PRFileInfoFN)_PR_InvalidStatus, @@ -3133,7 +2544,7 @@ PR_IMPLEMENT(const PRIOMethods*) PR_GetFileMethods() PR_IMPLEMENT(const PRIOMethods*) PR_GetPipeMethods() { return &_pr_pipe_methods; -} /* PR_GetFileMethods */ +} /* PR_GetPipeMethods */ PR_IMPLEMENT(const PRIOMethods*) PR_GetTCPMethods() { diff --git a/pr/src/pthreads/ptthread.c b/pr/src/pthreads/ptthread.c index 49f08da1..35f204f4 100644 --- a/pr/src/pthreads/ptthread.c +++ b/pr/src/pthreads/ptthread.c @@ -224,7 +224,6 @@ static PRThread* pt_AttachThread(void) PR_ASSERT(0 == rv); thred->state = PT_THREAD_GLOBAL | PT_THREAD_FOREIGN; - thred->io_tq_index = -1; PR_Lock(pt_book.ml); /* then put it into the list */ @@ -363,8 +362,6 @@ static PRThread* _PR_CreateThread( thred->stack->stackSize = stackSize; thred->stack->thr = thred; - thred->io_tq_index = -1; - #ifdef PT_NO_SIGTIMEDWAIT pthread_mutex_init(&thred->suspendResumeMutex,NULL); pthread_cond_init(&thred->suspendResumeCV,NULL); @@ -742,8 +739,6 @@ static void _pt_thread_death(void *arg) PR_Free(thred->privateData); if (NULL != thred->errorString) PR_Free(thred->errorString); - if (NULL != thred->io_cv) - PR_DestroyCondVar(thred->io_cv); PR_Free(thred->stack); #if defined(DEBUG) memset(thred, 0xaf, sizeof(PRThread)); @@ -809,8 +804,6 @@ void _PR_InitThreads( thred->stack->thr = thred; _PR_InitializeStack(thred->stack); - thred->io_tq_index = -1; - /* * Create a key for our use to store a backpointer in the pthread * to our PRThread object. This object gets deleted when the thread |