diff options
Diffstat (limited to 'pr')
-rw-r--r-- | pr/include/md/_win95.h | 1 | ||||
-rw-r--r-- | pr/include/md/_winnt.h | 1 | ||||
-rw-r--r-- | pr/include/private/primpl.h | 35 | ||||
-rw-r--r-- | pr/src/io/prfdcach.c | 4 | ||||
-rw-r--r-- | pr/src/io/prfile.c | 35 | ||||
-rw-r--r-- | pr/src/md/windows/ntio.c | 10 | ||||
-rw-r--r-- | pr/src/md/windows/w95io.c | 9 | ||||
-rw-r--r-- | pr/src/pthreads/ptio.c | 861 | ||||
-rw-r--r-- | pr/src/pthreads/ptthread.c | 7 |
9 files changed, 776 insertions, 187 deletions
diff --git a/pr/include/md/_win95.h b/pr/include/md/_win95.h index 6878b152..e82ddfe3 100644 --- a/pr/include/md/_win95.h +++ b/pr/include/md/_win95.h @@ -245,7 +245,6 @@ extern PRInt32 _MD_CloseSocket(PRInt32 osfd); #define _MD_SOCKET _PR_MD_SOCKET extern PRInt32 _MD_SocketAvailable(PRFileDesc *fd); #define _MD_SOCKETAVAILABLE _MD_SocketAvailable -#define _MD_PIPEAVAILABLE _PR_MD_PIPEAVAILABLE #define _MD_CONNECT _PR_MD_CONNECT extern PRInt32 _MD_Accept(PRFileDesc *fd, PRNetAddr *raddr, PRUint32 *rlen, PRIntervalTime timeout); diff --git a/pr/include/md/_winnt.h b/pr/include/md/_winnt.h index 85ddd505..77db2ddb 100644 --- a/pr/include/md/_winnt.h +++ b/pr/include/md/_winnt.h @@ -247,7 +247,6 @@ extern int _PR_NTFiberSafeSelect(int, fd_set *, fd_set *, fd_set *, const struct timeval *); #define _MD_FSYNC _PR_MD_FSYNC #define _MD_SOCKETAVAILABLE _PR_MD_SOCKETAVAILABLE -#define _MD_PIPEAVAILABLE _PR_MD_PIPEAVAILABLE #define _MD_SET_FD_INHERITABLE _PR_MD_SET_FD_INHERITABLE #define _MD_INIT_ATOMIC() diff --git a/pr/include/private/primpl.h b/pr/include/private/primpl.h index 8b96c424..ad468d2c 100644 --- a/pr/include/private/primpl.h +++ b/pr/include/private/primpl.h @@ -186,6 +186,11 @@ struct _PT_Notified typedef struct PTDebug { PRTime timeStarted; + PRUintn predictionsFoiled; + PRUintn pollingListMax; + PRUintn continuationsServed; + PRUintn recyclesNeeded; + PRUintn quiescentIO; PRUintn locks_created, locks_destroyed; PRUintn locks_acquired, locks_released; PRUintn cvars_created, cvars_destroyed; @@ -1416,7 +1421,9 @@ struct PRThread { #if defined(_PR_PTHREADS) pthread_t id; /* pthread identifier for the thread */ PRBool okToDelete; /* ok to delete the PRThread struct? */ + PRCondVar *io_cv; /* a condition used to run i/o */ PRCondVar *waiting; /* where the thread is waiting | NULL */ + PRIntn io_tq_index; /* the io-queue index for this thread */ void *sp; /* recorded sp for garbage collection */ PRThread *next, *prev; /* simple linked list of all threads */ PRUint32 suspend; /* used to store suspend and resume flags */ @@ -1542,8 +1549,36 @@ struct PRFilePrivate { PRFileDesc *next; PRIntn lockCount; _MDFileDesc md; +#ifdef _PR_PTHREADS + PRIntn eventMask[1]; /* An array of _pt_tq_count bitmasks. + * eventMask[i] is only accessed by + * the i-th i/o continuation thread. + * A 0 in a bitmask means the event + * should be igored in the revents + * bitmask returned by poll. + * + * poll's revents bitmask is a short, + * but we need to declare eventMask + * as an array of PRIntn's so that + * each bitmask can be updated + * individually without disturbing + * adjacent memory. Only the lower + * 16 bits of each PRIntn are used. */ +#endif +/* IMPORTANT: eventMask MUST BE THE LAST FIELD OF THIS STRUCTURE */ }; +/* + * The actual size of the PRFilePrivate structure, + * including the eventMask array at the end + */ +#ifdef _PR_PTHREADS +extern PRIntn _pt_tq_count; +#define PRFILEPRIVATE_SIZE (sizeof(PRFilePrivate) + (_pt_tq_count-1) * sizeof(PRIntn)) +#else +#define PRFILEPRIVATE_SIZE sizeof(PRFilePrivate) +#endif + struct PRDir { PRDirEntry d; _MDDir md; diff --git a/pr/src/io/prfdcach.c b/pr/src/io/prfdcach.c index 00947d31..6ad2e84c 100644 --- a/pr/src/io/prfdcach.c +++ b/pr/src/io/prfdcach.c @@ -115,14 +115,14 @@ finished: fd->dtor = NULL; fd->lower = fd->higher = NULL; fd->identity = PR_NSPR_IO_LAYER; - memset(fd->secret, 0, sizeof(PRFilePrivate)); + memset(fd->secret, 0, PRFILEPRIVATE_SIZE); return fd; allocate: fd = PR_NEW(PRFileDesc); if (NULL != fd) { - fd->secret = PR_NEW(PRFilePrivate); + fd->secret = (PRFilePrivate *) PR_MALLOC(PRFILEPRIVATE_SIZE); if (NULL == fd->secret) PR_DELETE(fd); } if (NULL != fd) goto finished; diff --git a/pr/src/io/prfile.c b/pr/src/io/prfile.c index 31540e6a..ab16e27a 100644 --- a/pr/src/io/prfile.c +++ b/pr/src/io/prfile.c @@ -155,7 +155,7 @@ static PRInt64 PR_CALLBACK FileAvailable64(PRFileDesc *fd) return result; } -#if defined(XP_UNIX) || defined(WIN32) +#ifndef WIN32 static PRInt32 PR_CALLBACK PipeAvailable(PRFileDesc *fd) { PRInt32 rv; @@ -169,29 +169,8 @@ static PRInt64 PR_CALLBACK PipeAvailable64(PRFileDesc *fd) LL_I2L(rv, _PR_MD_PIPEAVAILABLE(fd)); return rv; } -#else -static PRInt32 PR_CALLBACK PipeAvailable(PRFileDesc *fd) -{ - return -1; -} - -static PRInt64 PR_CALLBACK PipeAvailable64(PRFileDesc *fd) -{ - PRInt64 rv; - LL_I2L(rv, -1); - return rv; -} #endif -static PRStatus PR_CALLBACK PipeSync(PRFileDesc *fd) -{ -#if defined(XP_MAC) -#pragma unused (fd) -#endif - - return PR_SUCCESS; -} - static PRStatus PR_CALLBACK FileInfo(PRFileDesc *fd, PRFileInfo *info) { PRInt32 rv; @@ -304,9 +283,14 @@ static PRIOMethods _pr_pipeMethods = { FileClose, FileRead, FileWrite, +#ifdef WIN32 + FileAvailable, + FileAvailable64, +#else PipeAvailable, PipeAvailable64, - PipeSync, +#endif + FileSync, (PRSeekFN)_PR_InvalidInt, (PRSeek64FN)_PR_InvalidInt64, (PRFileInfoFN)_PR_InvalidStatus, @@ -338,11 +322,6 @@ static PRIOMethods _pr_pipeMethods = { (PRReservedFN)_PR_InvalidInt }; -PR_IMPLEMENT(const PRIOMethods*) PR_GetPipeMethods(void) -{ - return &_pr_pipeMethods; -} - PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode) { PRInt32 osfd; diff --git a/pr/src/md/windows/ntio.c b/pr/src/md/windows/ntio.c index 1b60a380..201bee2f 100644 --- a/pr/src/md/windows/ntio.c +++ b/pr/src/md/windows/ntio.c @@ -2475,16 +2475,6 @@ _PR_MD_SOCKETAVAILABLE(PRFileDesc *fd) return result; } -PRInt32 -_PR_MD_PIPEAVAILABLE(PRFileDesc *fd) -{ - if (NULL == fd) - PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0); - else - PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); - return -1; -} - PROffset32 _PR_MD_LSEEK(PRFileDesc *fd, PROffset32 offset, int whence) { diff --git a/pr/src/md/windows/w95io.c b/pr/src/md/windows/w95io.c index 6e969a98..c2553fc7 100644 --- a/pr/src/md/windows/w95io.c +++ b/pr/src/md/windows/w95io.c @@ -923,12 +923,3 @@ _PR_MD_UNLOCKFILE(PRInt32 f) } } /* end _PR_MD_UNLOCKFILE() */ -PRInt32 -_PR_MD_PIPEAVAILABLE(PRFileDesc *fd) -{ - if (NULL == fd) - PR_SetError(PR_BAD_DESCRIPTOR_ERROR, 0); - else - PR_SetError(PR_NOT_IMPLEMENTED_ERROR, 0); - return -1; -} diff --git a/pr/src/pthreads/ptio.c b/pr/src/pthreads/ptio.c index f1e6a255..accca747 100644 --- a/pr/src/pthreads/ptio.c +++ b/pr/src/pthreads/ptio.c @@ -190,7 +190,7 @@ static PRBool IsValidNetAddrLen(const PRNetAddr *addr, PRInt32 addr_len) * The polling interval defines the maximum amount of time that a thread * might hang up before an interrupt is noticed. */ -#define PT_DEFAULT_POLL_MSEC 5000 +#define PT_DEFAULT_POLL_MSEC 100 /* * pt_SockLen is the type for the length of a socket address @@ -217,11 +217,17 @@ typedef PRBool (*ContinuationFn)(pt_Continuation *op, PRInt16 revents); typedef enum pr_ContuationStatus { pt_continuation_pending, + pt_continuation_recycle, + pt_continuation_abort, pt_continuation_done } pr_ContuationStatus; struct pt_Continuation { + /* These objects are linked in ascending timeout order */ + pt_Continuation *next, *prev; /* self linked list of these things */ + + PRFileDesc *fd; /* The building of the continuation operation */ ContinuationFn function; /* what function to continue */ union { PRIntn osfd; } arg1; /* #1 - the op's fd */ @@ -252,6 +258,7 @@ struct pt_Continuation #endif /* HPUX11 */ PRIntervalTime timeout; /* client (relative) timeout */ + PRIntervalTime absolute; /* internal (absolute) timeout */ PRInt16 event; /* flags for poll()'s events */ @@ -264,8 +271,27 @@ struct pt_Continuation PRIntn syserrno; /* in case it failed, why (errno) */ pr_ContuationStatus status; /* the status of the operation */ + PRCondVar *complete; /* to notify the initiating thread */ + PRIntn io_tq_index; /* io-queue index */ }; +static struct pt_TimedQueue +{ + PRLock *ml; /* a little protection */ + PRThread *thread; /* internal thread's identification */ + PRUintn op_count; /* number of operations in the list */ + pt_Continuation *head, *tail; /* head/tail of list of operations */ + + pt_Continuation *op; /* timed operation furthest in future */ + struct pollfd *pollingList; /* list built for polling */ + PRIntn pollingSlotsAllocated; /* # entries available in list */ + pt_Continuation **pollingOps; /* list paralleling polling list */ +} *pt_tqp; /* an array */ + +static PRIntn _pt_num_cpus; +PRIntn _pt_tq_count; /* size of the pt_tqp array */ +static PRInt32 _pt_tq_index; /* atomically incremented */ + #if defined(DEBUG) PTDebug pt_debug; /* this is shared between several modules */ @@ -290,6 +316,16 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg) PR_fprintf( debug_out, "\tstarted: %s[%lld]\n", buffer, elapsed); PR_fprintf( + debug_out, "\tmissed predictions: %u\n", stats.predictionsFoiled); + PR_fprintf( + debug_out, "\tpollingList max: %u\n", stats.pollingListMax); + PR_fprintf( + debug_out, "\tcontinuations served: %u\n", stats.continuationsServed); + PR_fprintf( + debug_out, "\trecycles needed: %u\n", stats.recyclesNeeded); + PR_fprintf( + debug_out, "\tquiescent IO: %u\n", stats.quiescentIO); + PR_fprintf( debug_out, "\tlocks [created: %u, destroyed: %u]\n", stats.locks_created, stats.locks_destroyed); PR_fprintf( @@ -305,135 +341,547 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg) #endif /* DEBUG */ -static void pt_poll_now(pt_Continuation *op) +/* + * The following two functions, pt_InsertTimedInternal and + * pt_FinishTimedInternal, are always called with the tqp->ml + * lock held. The "internal" in the functions' names come from + * the Mesa programming language. Internal functions are always + * called from inside a monitor. + */ + +static void pt_InsertTimedInternal(pt_Continuation *op) { - PRInt32 msecs; - PRIntervalTime epoch, now, elapsed, remaining; - PRThread *self = PR_GetCurrentThread(); + pt_Continuation *t_op = NULL; + PRIntervalTime now = PR_IntervalNow(); + struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index]; + +#if defined(DEBUG) + { + PRIntn count; + pt_Continuation *tmp; + PRThread *self = PR_GetCurrentThread(); + + PR_ASSERT(tqp == &pt_tqp[self->io_tq_index]); + PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL)); + PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0)); + for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; + PR_ASSERT(count == tqp->op_count); + for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; + PR_ASSERT(count == tqp->op_count); + for (tmp = tqp->head; tmp != NULL; tmp = tmp->next) + PR_ASSERT(tmp->io_tq_index == op->io_tq_index); + } +#endif /* defined(DEBUG) */ + + /* + * If this element operation isn't timed, it gets queued at the + * end of the list (just after tqp->tail) and we're + * finishd early. + */ + if (PR_INTERVAL_NO_TIMEOUT == op->timeout) + { + t_op = tqp->tail; /* put it at the end */ + goto done; + } + + /* + * The portion of this routine deals with timed ops. + */ + op->absolute = now + op->timeout; /* absolute ticks */ + if (NULL == tqp->op) tqp->op = op; + else + { + /* + * To find where in the list to put the new operation, based + * on the absolute time the operation in question will expire. + * + * The new operation ('op') will expire at now() + op->timeout. + * + * This should be easy! + */ + + for (t_op = tqp->op; NULL != t_op; t_op = t_op->prev) + { + /* + * If 'op' expires later than t_op, then insert 'op' just + * ahead of t_op. Otherwise, compute when operation[n-1] + * expires and try again. + * + * The actual difference between the expiriation of 'op' + * and the current operation what becomes the new operaton's + * timeout interval. That interval is also subtracted from + * the interval of the operation immediately following where + * we stick 'op' (unless the next one isn't timed). The new + * timeout assigned to 'op' takes into account the values of + * now() and when the previous intervals were computed. + */ + if ((PRInt32)(op->absolute - t_op->absolute) >= 0) + { + if (t_op == tqp->op) tqp->op = op; + break; + } + } + } + +done: + + /* + * Insert 'op' into the queue just after t_op or if t_op is null, + * at the head of the list. + * + * We need to set up the 'next' and 'prev' pointers of 'op' + * correctly before inserting 'op' into the queue. Also, we insert + * 'op' by updating tqp->head or op->prev->next first, and then + * updating op->next->prev. We want to make sure that the 'next' + * pointers are linked up correctly at all times so that we can + * traverse the queue by starting with tqp->head and following + * the 'next' pointers, without having to acquire the tqp->ml lock. + * (we do that in pt_ContinuationThreadInternal). We traverse the 'prev' + * pointers only in this function, which is called with the lock held. + * + * Similar care is taken in pt_FinishTimedInternal where we remove + * an op from the queue. + */ + if (NULL == t_op) + { + op->prev = NULL; + op->next = tqp->head; + tqp->head = op; + if (NULL == tqp->tail) tqp->tail = op; + else op->next->prev = op; + } + else + { + op->prev = t_op; + op->next = t_op->next; + if (NULL != op->prev) + op->prev->next = op; + if (NULL != op->next) + op->next->prev = op; + if (t_op == tqp->tail) + tqp->tail = op; + } + + tqp->op_count += 1; + +#if defined(DEBUG) + { + PRIntn count; + pt_Continuation *tmp; + PR_ASSERT(tqp->head != NULL); + PR_ASSERT(tqp->tail != NULL); + PR_ASSERT(tqp->op_count != 0); + PR_ASSERT(tqp->head->prev == NULL); + PR_ASSERT(tqp->tail->next == NULL); + if (tqp->op_count > 1) + { + PR_ASSERT(tqp->head->next != NULL); + PR_ASSERT(tqp->tail->prev != NULL); + } + else + { + PR_ASSERT(tqp->head->next == NULL); + PR_ASSERT(tqp->tail->prev == NULL); + } + for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; + PR_ASSERT(count == tqp->op_count); + for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; + PR_ASSERT(count == tqp->op_count); + } +#endif /* defined(DEBUG) */ + +} /* pt_InsertTimedInternal */ + +/* + * function: pt_FinishTimedInternal + * + * Takes the finished operation out of the timed queue. It + * notifies the initiating thread that the opertions is + * complete and returns to the caller the value of the next + * operation in the list (or NULL). + */ +static pt_Continuation *pt_FinishTimedInternal(pt_Continuation *op) +{ + pt_Continuation *next; + struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index]; + +#if defined(DEBUG) + { + PRIntn count; + pt_Continuation *tmp; + PR_ASSERT(tqp->head != NULL); + PR_ASSERT(tqp->tail != NULL); + PR_ASSERT(tqp->op_count != 0); + PR_ASSERT(tqp->head->prev == NULL); + PR_ASSERT(tqp->tail->next == NULL); + if (tqp->op_count > 1) + { + PR_ASSERT(tqp->head->next != NULL); + PR_ASSERT(tqp->tail->prev != NULL); + } + else + { + PR_ASSERT(tqp->head->next == NULL); + PR_ASSERT(tqp->tail->prev == NULL); + } + for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; + PR_ASSERT(count == tqp->op_count); + for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; + PR_ASSERT(count == tqp->op_count); + } +#endif /* defined(DEBUG) */ + + /* remove this one from the list */ + if (NULL == op->prev) tqp->head = op->next; + else op->prev->next = op->next; + if (NULL == op->next) tqp->tail = op->prev; + else op->next->prev = op->prev; + + /* did we happen to hit the timed op? */ + if (op == tqp->op) tqp->op = op->prev; + + next = op->next; + op->next = op->prev = NULL; + op->status = pt_continuation_done; + + tqp->op_count -= 1; + +#if defined(DEBUG) + pt_debug.continuationsServed += 1; +#endif + PR_NotifyCondVar(op->complete); + +#if defined(DEBUG) + { + PRIntn count; + pt_Continuation *tmp; + PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL)); + PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0)); + for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; + PR_ASSERT(count == tqp->op_count); + for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; + PR_ASSERT(count == tqp->op_count); + } +#endif /* defined(DEBUG) */ + + return next; +} /* pt_FinishTimedInternal */ + +static void pt_ContinuationThreadInternal(pt_Continuation *my_op) +{ + /* initialization */ + PRInt32 msecs, mx_poll_ticks; + PRThreadPriority priority; /* used to save caller's prio */ + PRIntn pollingListUsed; /* # entries used in the list */ + PRIntn pollingListNeeded; /* # entries needed this time */ + PRIntn io_tq_index = my_op->io_tq_index; + struct pt_TimedQueue *tqp = &pt_tqp[my_op->io_tq_index]; + struct pollfd *pollingList = tqp->pollingList; + PRIntn pollingSlotsAllocated = tqp->pollingSlotsAllocated; + pt_Continuation **pollingOps = tqp->pollingOps; - PR_ASSERT(PR_INTERVAL_NO_WAIT != op->timeout); - - switch (op->timeout) { - case PR_INTERVAL_NO_TIMEOUT: - msecs = PT_DEFAULT_POLL_MSEC; - do - { - PRIntn rv; - struct pollfd tmp_pfd; - - tmp_pfd.revents = 0; - tmp_pfd.fd = op->arg1.osfd; - tmp_pfd.events = op->event; - - rv = poll(&tmp_pfd, 1, msecs); - - if (self->state & PT_THREAD_ABORTED) - { - self->state &= ~PT_THREAD_ABORTED; - op->result.code = -1; - op->syserrno = EINTR; - op->status = pt_continuation_done; - return; - } - - if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) - continue; /* go around the loop again */ - - if (rv > 0) - { - PRIntn fd = tmp_pfd.fd; - PRInt16 events = tmp_pfd.events; - PRInt16 revents = tmp_pfd.revents; - - if ((revents & POLLNVAL) /* busted in all cases */ - || ((events & POLLOUT) && (revents & POLLHUP))) - /* write op & hup */ - { - op->result.code = -1; - if (POLLNVAL & revents) op->syserrno = EBADF; - else if (POLLHUP & revents) op->syserrno = EPIPE; - op->status = pt_continuation_done; - } else { - if (op->function(op, revents)) - op->status = pt_continuation_done; - } - } else if (rv == -1) { - op->result.code = -1; - op->syserrno = errno; - op->status = pt_continuation_done; - } - /* else, poll timed out */ - } while (pt_continuation_done != op->status); - break; - default: - now = epoch = PR_IntervalNow(); - remaining = op->timeout; - do - { - PRIntn rv; - struct pollfd tmp_pfd; - - tmp_pfd.revents = 0; - tmp_pfd.fd = op->arg1.osfd; - tmp_pfd.events = op->event; - - msecs = (PRInt32)PR_IntervalToMilliseconds(remaining); - if (msecs > PT_DEFAULT_POLL_MSEC) - msecs = PT_DEFAULT_POLL_MSEC; - rv = poll(&tmp_pfd, 1, msecs); - - if (self->state & PT_THREAD_ABORTED) - { - self->state &= ~PT_THREAD_ABORTED; - op->result.code = -1; - op->syserrno = EINTR; - op->status = pt_continuation_done; - return; - } - - if (rv > 0) - { - PRIntn fd = tmp_pfd.fd; - PRInt16 events = tmp_pfd.events; - PRInt16 revents = tmp_pfd.revents; - - if ((revents & POLLNVAL) /* busted in all cases */ - || ((events & POLLOUT) && (revents & POLLHUP))) - /* write op & hup */ - { - op->result.code = -1; - if (POLLNVAL & revents) op->syserrno = EBADF; - else if (POLLHUP & revents) op->syserrno = EPIPE; - op->status = pt_continuation_done; - } else { - if (op->function(op, revents)) - { - op->status = pt_continuation_done; - } - } - } else if ((rv == 0) || - ((errno == EINTR) || (errno == EAGAIN))) { - if (rv == 0) /* poll timed out */ - now += PR_MillisecondsToInterval(msecs); - else - now = PR_IntervalNow(); - elapsed = (PRIntervalTime) (now - epoch); - if (elapsed >= op->timeout) { - op->result.code = -1; - op->syserrno = ETIMEDOUT; - op->status = pt_continuation_done; - } else - remaining = op->timeout - elapsed; - } else { - op->result.code = -1; - op->syserrno = errno; - op->status = pt_continuation_done; - } - } while (pt_continuation_done != op->status); - break; - } - -} /* pt_poll_now */ + PR_Unlock(tqp->ml); /* don't need that silly lock for a bit */ + + priority = PR_GetThreadPriority(tqp->thread); + PR_SetThreadPriority(tqp->thread, PR_PRIORITY_HIGH); + + mx_poll_ticks = (PRInt32)PR_MillisecondsToInterval(PT_DEFAULT_POLL_MSEC); + + /* do some real work */ + while (PR_TRUE) + { + PRIntn rv; + PRInt32 timeout; + PRIntn pollIndex; + PRIntervalTime now; + pt_Continuation *op, *next_op; + + PR_ASSERT(NULL != tqp->head); + + pollingListNeeded = tqp->op_count; + + /* + * We are not holding the tqp->ml lock now, so more items may + * get added to pt_tq during this window of time. We hope + * that 10 more spaces in the polling list should be enough. + * + * The space allocated is for both a vector that parallels the + * polling list, providing pointers directly into the operation's + * table and the polling list itself. There is a guard element + * between the two structures. + */ + pollingListNeeded += 10; + if (pollingListNeeded > pollingSlotsAllocated) + { + if (NULL != pollingOps) PR_Free(pollingOps); + pollingOps = (pt_Continuation**)PR_Malloc( + sizeof(pt_Continuation**) + pollingListNeeded * + (sizeof(struct pollfd) + sizeof(pt_Continuation*))); + PR_ASSERT(NULL != pollingOps); + tqp->pollingOps = pollingOps; + pollingSlotsAllocated = pollingListNeeded; + tqp->pollingSlotsAllocated = pollingSlotsAllocated; + pollingOps[pollingSlotsAllocated] = (pt_Continuation*)-1; + pollingList = (struct pollfd*)(&pollingOps[pollingSlotsAllocated + 1]); + tqp->pollingList = pollingList; + + } + +#if defined(DEBUG) + if (pollingListNeeded > pt_debug.pollingListMax) + pt_debug.pollingListMax = pollingListNeeded; +#endif + + /* + ** This is interrupt processing. If this thread was interrupted, + ** the thread state will have the PT_THREAD_ABORTED bit set. This + ** overrides good completions as well as timeouts. + ** + ** BTW, it does no good to hold the lock here. This lock doesn't + ** protect the thread structure in any way. Testing the bit and + ** (perhaps) resetting it are safe 'cause it's the only modifiable + ** bit in that word. + */ + if (_PT_THREAD_INTERRUPTED(tqp->thread)) + { + my_op->status = pt_continuation_abort; + tqp->thread->state &= ~PT_THREAD_ABORTED; + } + + + /* + * Build up a polling list. + * This list is sorted on time. Operations that have been + * interrupted are completed and not included in the list. + * There is an assertion that the operation is in progress. + */ + pollingListUsed = 0; + PR_Lock(tqp->ml); + + for (op = tqp->head; NULL != op;) + { + if (pt_continuation_abort == op->status) + { + op->result.code = -1; + op->syserrno = EINTR; + next_op = pt_FinishTimedInternal(op); + if (op == my_op) goto recycle; + else op = next_op; + PR_ASSERT(NULL != tqp->head); + } + else + { + op->status = pt_continuation_pending; + if (pollingListUsed >= pollingSlotsAllocated) + { +#if defined(DEBUG) + pt_debug.predictionsFoiled += 1; +#endif + break; + } + PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]); + /* + * eventMask bitmasks are declared as PRIntn so that + * each bitmask can be updated individually without + * disturbing adjacent memory, but only the lower 16 + * bits of a bitmask are used. + */ + op->fd->secret->eventMask[io_tq_index] = 0xffff; + pollingOps[pollingListUsed] = op; + pollingList[pollingListUsed].revents = 0; + pollingList[pollingListUsed].fd = op->arg1.osfd; + pollingList[pollingListUsed].events = op->event; + pollingListUsed += 1; + op = op->next; + } + } + + /* + * We don't want to wait forever on this poll. So keep + * the interval down. The operations, if they are timed, + * still have to timeout, while those that are not timed + * should persist forever. But they may be aborted. That's + * what this anxiety is all about. + */ + if (PR_INTERVAL_NO_TIMEOUT == tqp->head->timeout) + msecs = PT_DEFAULT_POLL_MSEC; + else + { + timeout = tqp->head->absolute - PR_IntervalNow(); + if (timeout <= 0) msecs = 0; /* already timed out */ + else if (timeout >= mx_poll_ticks) msecs = PT_DEFAULT_POLL_MSEC; + else msecs = (PRInt32)PR_IntervalToMilliseconds(timeout); + } + + PR_Unlock(tqp->ml); + + /* + * If 'op' isn't NULL at this point, then we didn't get to + * the end of the list. That means that more items got added + * to the list than we anticipated. So, forget this iteration, + * go around the horn again. + * + * One would hope this doesn't happen all that often. + */ + if (NULL != op) continue; /* make it rethink things */ + + PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]); + + rv = poll(pollingList, pollingListUsed, msecs); + + if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) + continue; /* go around the loop again */ + + if (rv > 0) + { + /* + * poll() says that something in our list is ready for some more + * action. Find it, load up the operation and see what happens. + */ + + /* + * This may work out okay. The rule is that only this thread, + * the continuation thread, can remove elements from the list. + * Therefore, the list is at worst, longer than when we built + * the polling list. + */ + + for (pollIndex = 0; pollIndex < pollingListUsed; ++pollIndex) + { + PRInt16 events = pollingList[pollIndex].events; + PRInt16 revents = pollingList[pollIndex].revents; + + op = pollingOps[pollIndex]; /* this is the operation */ + + /* (ref: Bug #153459) + ** In case of POLLERR we let the operation retry in hope + ** of getting a more definitive OS error. + */ + if ((revents & POLLNVAL) /* busted in all cases */ + || ((events & POLLOUT) && (revents & POLLHUP))) /* write op & hup */ + { + PR_Lock(tqp->ml); + op->result.code = -1; + if (POLLNVAL & revents) op->syserrno = EBADF; + else if (POLLHUP & revents) op->syserrno = EPIPE; + (void)pt_FinishTimedInternal(op); + if (op == my_op) goto recycle; + PR_Unlock(tqp->ml); + } + else if ((0 != (revents & op->fd->secret->eventMask[io_tq_index])) + && (pt_continuation_pending == op->status)) + { + /* + * Only good?(?) revents left. Operations not pending + * will be pruned next time we build a list. This operation + * will be pruned if the continuation indicates it is + * finished. + */ + + if (op->function(op, revents)) + { + PR_Lock(tqp->ml); + (void)pt_FinishTimedInternal(op); + if (op == my_op) goto recycle; + PR_Unlock(tqp->ml); + } + else + { + /* + * If the continuation function returns + * PR_FALSE, it means available data have + * been read, output buffer space has been + * filled, or pending connections have been + * accepted by prior calls. If the + * continuation function is immediately + * invoked again, it will most likely + * return PR_FALSE. So turn off these + * events in the event mask for this fd so + * that if this fd is encountered again in + * the polling list with these events on, + * we won't invoke the continuation + * function again. + */ + op->fd->secret->eventMask[io_tq_index] &= ~revents; + } + } + } + } + + /* + * This is timeout processing. It is done after checking + * for good completions. Those that just made it under the + * wire are lucky, but none the less, valid. + */ + now = PR_IntervalNow(); + PR_Lock(tqp->ml); + while ((NULL != tqp->head) + && (PR_INTERVAL_NO_TIMEOUT != tqp->head->timeout)) + { + op = tqp->head; /* get a copy of this before finishing it */ + if ((PRInt32)(op->absolute - now) > 0) break; + /* + * The head element of the timed queue has timed out. Record + * the reason for completion and take it out of the list. + */ + op->result.code = -1; + op->syserrno = ETIMEDOUT; + (void)pt_FinishTimedInternal(op); + + /* + * If it's 'my_op' then we have to switch threads. Exit w/o + * finishing the scan. The scan will be completed when another + * thread calls in acting as the continuation thread. + */ + if (op == my_op) goto recycle; /* exit w/o unlocking */ + } + PR_Unlock(tqp->ml); + } + + PR_NOT_REACHED("This is a while(true) loop /w no breaks"); + +recycle: + /* + ** Recycling the continuation thread. + ** + ** The thread we were using for I/O continuations just completed + ** the I/O it submitted. It has to return to it's caller. We need + ** another thread to act in the continuation role. We can do that + ** by taking any operation off the timed queue, setting its state + ** to 'recycle' and notifying the condition. + ** + ** Selecting a likely thread seems like magic. I'm going to try + ** using one that has the longest (or no) timeout, tqp->tail. + ** If that slot's empty, then there's no outstanding I/O and we + ** don't need a thread at all. + ** + ** BTW, we're locked right now, and we'll be returning with the + ** the lock held as well. Seems odd, doesn't it? + */ + + /* $$$ should this be called with the lock held? $$$ */ + PR_SetThreadPriority(tqp->thread, priority); /* reset back to caller's */ + + PR_ASSERT((NULL == tqp->head) == (0 == tqp->op_count)); + PR_ASSERT((NULL == tqp->head) == (NULL == tqp->tail)); + PR_ASSERT(pt_continuation_done == my_op->status); + + if (NULL != tqp->tail) + { + if (tqp->tail->status != pt_continuation_abort) + { + tqp->tail->status = pt_continuation_recycle; + } + PR_NotifyCondVar(tqp->tail->complete); +#if defined(DEBUG) + pt_debug.recyclesNeeded += 1; +#endif + } +#if defined(DEBUG) + else pt_debug.quiescentIO += 1; +#endif + +} /* pt_ContinuationThreadInternal */ static PRIntn pt_Continue(pt_Continuation *op) { @@ -441,13 +889,99 @@ static PRIntn pt_Continue(pt_Continuation *op) PRThread *self = PR_GetCurrentThread(); struct pt_TimedQueue *tqp; + /* lazy assignment of the thread's ioq */ + if (-1 == self->io_tq_index) + { + self->io_tq_index = (PR_AtomicIncrement(&_pt_tq_index)-1) % _pt_tq_count; + } + + PR_ASSERT(self->io_tq_index >= 0); + tqp = &pt_tqp[self->io_tq_index]; + + /* lazy allocation of the thread's cv */ + if (NULL == self->io_cv) + self->io_cv = PR_NewCondVar(tqp->ml); + /* Finish filling in the blank slots */ + op->complete = self->io_cv; op->status = pt_continuation_pending; /* set default value */ - /* - * let each thread call poll directly - */ - pt_poll_now(op); - PR_ASSERT(pt_continuation_done == op->status); - return op->result.code; + op->io_tq_index = self->io_tq_index; + PR_Lock(tqp->ml); /* we provide the locking */ + + pt_InsertTimedInternal(op); /* insert in the structure */ + + /* + ** At this point, we try to decide whether there is a continuation + ** thread, or whether we should assign this one to serve in that role. + */ + do + { + if (NULL == tqp->thread) + { + /* + ** We're the one. Call the processing function with the lock + ** held. It will return with it held as well, though there + ** will certainly be times within the function when it gets + ** released. + */ + tqp->thread = self; /* I'm taking control */ + pt_ContinuationThreadInternal(op); /* go slash and burn */ + PR_ASSERT(pt_continuation_done == op->status); + tqp->thread = NULL; /* I'm abdicating my rule */ + } + else + { + rv = PR_WaitCondVar(op->complete, PR_INTERVAL_NO_TIMEOUT); + /* + * If we get interrupted, we set state the continuation thread will + * see and allow it to finish the I/O operation w/ error. That way + * the rule that only the continuation thread is removing elements + * from the list is still valid. + * + * Don't call interrupt on the continuation thread. That'll just + * irritate him. He's cycling around at least every mx_poll_ticks + * anyhow and should notice the request in there. When he does + * notice, this operation will be finished and the op's status + * marked as pt_continuation_done. + */ + if ((PR_FAILURE == rv) /* the wait was interrupted */ + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) + { + if (pt_continuation_done == op->status) + { + /* + * The op is done and has been removed + * from the timed queue. We must not + * change op->status, otherwise this + * thread will go around the loop again. + * + * It's harsh to mark the op failed with + * interrupt error when the io is already + * done, but we should indicate the fact + * that the thread was interrupted. So + * we set the aborted flag to abort the + * thread's next blocking call. Is this + * the right thing to do? + */ + self->state |= PT_THREAD_ABORTED; + } + else + { + /* go around the loop again */ + op->status = pt_continuation_abort; + } + } + /* + * If we're to recycle, continue within this loop. This will + * cause this thread to become the continuation thread. + */ + + } + } while (pt_continuation_done != op->status); + + + PR_Unlock(tqp->ml); /* we provided the locking */ + + return op->result.code; /* and the primary answer */ } /* pt_Continue */ /*****************************************************************************/ @@ -756,8 +1290,52 @@ static PRBool pt_hpux_sendfile_cont(pt_Continuation *op, PRInt16 revents) } #endif /* HPUX11 */ +#define _MD_CPUS_ONLINE 2 + void _PR_InitIO() { + PRIntn index; + char *num_io_queues; + + if (NULL != (num_io_queues = getenv("NSPR_NUM_IO_QUEUES"))) + { + _pt_tq_count = atoi(num_io_queues); + } + else + { + /* + * Get the number of CPUs if the pthread + * library has kernel-scheduled entities that + * can run on multiple CPUs. + */ +#ifdef HPUX11 + _pt_num_cpus = pthread_num_processors_np(); +#elif defined(IRIX) || defined(OSF1) + _pt_num_cpus = sysconf(_SC_NPROC_ONLN); +#elif defined(AIX) || defined(LINUX) || defined(SOLARIS) + _pt_num_cpus = sysconf(_SC_NPROCESSORS_ONLN); +#else + /* + * A pure user-level (Mx1) pthread library can + * only use one CPU, even on a multiprocessor. + */ + _pt_num_cpus = 1; +#endif + if (_pt_num_cpus < 0) + _pt_num_cpus = _MD_CPUS_ONLINE; + _pt_tq_count = _pt_num_cpus; + } + + pt_tqp = (struct pt_TimedQueue *) + PR_CALLOC(_pt_tq_count * sizeof(struct pt_TimedQueue)); + PR_ASSERT(NULL != pt_tqp); + + for (index = 0; index < _pt_tq_count; index++) + { + pt_tqp[index].ml = PR_NewLock(); + PR_ASSERT(NULL != pt_tqp[index].ml); + } + #if defined(DEBUG) memset(&pt_debug, 0, sizeof(PTDebug)); pt_debug.timeStarted = PR_Now(); @@ -860,6 +1438,7 @@ static PRInt32 pt_Read(PRFileDesc *fd, void *buf, PRInt32 amount) && (!fd->secret->nonblocking)) { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -900,6 +1479,7 @@ static PRInt32 pt_Write(PRFileDesc *fd, const void *buf, PRInt32 amount) if (fNeedContinue == PR_TRUE) { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -993,6 +1573,7 @@ static PRInt32 pt_Writev( { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)osiov; op.arg3.amount = osiov_len; @@ -1132,6 +1713,7 @@ static PRStatus pt_Connect( else { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; #ifdef _PR_HAVE_SOCKADDR_LEN op.arg2.buffer = (void*)&addrCopy; @@ -1206,6 +1788,7 @@ static PRFileDesc* pt_Accept( else { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = addr; op.arg3.addr_len = &addr_len; @@ -1336,6 +1919,7 @@ static PRInt32 pt_Recv( else { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -1420,6 +2004,7 @@ static PRInt32 pt_Send( if (fNeedContinue == PR_TRUE) { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -1479,6 +2064,7 @@ static PRInt32 pt_SendTo( if (fNeedContinue == PR_TRUE) { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -1524,6 +2110,7 @@ static PRInt32 pt_RecvFrom(PRFileDesc *fd, void *buf, PRInt32 amount, if (fNeedContinue == PR_TRUE) { pt_Continuation op; + op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -1651,6 +2238,7 @@ static PRInt32 pt_AIXSendFile(PRFileDesc *sd, PRSendFileData *sfd, if ((rv == 1) || ((rv == -1) && (count == 0))) { pt_Continuation op; + op.fd = sd; op.arg1.osfd = sd->secret->md.osfd; op.arg2.buffer = &sf_struct; op.arg4.flags = send_flags; @@ -1769,6 +2357,7 @@ static PRInt32 pt_HPUXSendFile(PRFileDesc *sd, PRSendFileData *sfd, hdtrl[1].iov_len = sfd->tlen - trailer_nbytes_sent; } + op.fd = sd; op.arg1.osfd = sd->secret->md.osfd; op.filedesc = sfd->fd->secret->md.osfd; op.arg2.buffer = hdtrl; @@ -2326,7 +2915,7 @@ static PRIOMethods _pr_pipe_methods = { pt_Write, pt_Available_s, pt_Available64_s, - pt_Synch, + pt_Fsync, (PRSeekFN)_PR_InvalidInt, (PRSeek64FN)_PR_InvalidInt64, (PRFileInfoFN)_PR_InvalidStatus, @@ -2544,7 +3133,7 @@ PR_IMPLEMENT(const PRIOMethods*) PR_GetFileMethods() PR_IMPLEMENT(const PRIOMethods*) PR_GetPipeMethods() { return &_pr_pipe_methods; -} /* PR_GetPipeMethods */ +} /* PR_GetFileMethods */ PR_IMPLEMENT(const PRIOMethods*) PR_GetTCPMethods() { diff --git a/pr/src/pthreads/ptthread.c b/pr/src/pthreads/ptthread.c index 35f204f4..49f08da1 100644 --- a/pr/src/pthreads/ptthread.c +++ b/pr/src/pthreads/ptthread.c @@ -224,6 +224,7 @@ static PRThread* pt_AttachThread(void) PR_ASSERT(0 == rv); thred->state = PT_THREAD_GLOBAL | PT_THREAD_FOREIGN; + thred->io_tq_index = -1; PR_Lock(pt_book.ml); /* then put it into the list */ @@ -362,6 +363,8 @@ static PRThread* _PR_CreateThread( thred->stack->stackSize = stackSize; thred->stack->thr = thred; + thred->io_tq_index = -1; + #ifdef PT_NO_SIGTIMEDWAIT pthread_mutex_init(&thred->suspendResumeMutex,NULL); pthread_cond_init(&thred->suspendResumeCV,NULL); @@ -739,6 +742,8 @@ static void _pt_thread_death(void *arg) PR_Free(thred->privateData); if (NULL != thred->errorString) PR_Free(thred->errorString); + if (NULL != thred->io_cv) + PR_DestroyCondVar(thred->io_cv); PR_Free(thred->stack); #if defined(DEBUG) memset(thred, 0xaf, sizeof(PRThread)); @@ -804,6 +809,8 @@ void _PR_InitThreads( thred->stack->thr = thred; _PR_InitializeStack(thred->stack); + thred->io_tq_index = -1; + /* * Create a key for our use to store a backpointer in the pthread * to our PRThread object. This object gets deleted when the thread |