diff options
Diffstat (limited to 'pr/src/pthreads/ptio.c')
-rw-r--r-- | pr/src/pthreads/ptio.c | 1290 |
1 files changed, 373 insertions, 917 deletions
diff --git a/pr/src/pthreads/ptio.c b/pr/src/pthreads/ptio.c index f65dbdcc..b301ebd5 100644 --- a/pr/src/pthreads/ptio.c +++ b/pr/src/pthreads/ptio.c @@ -143,7 +143,8 @@ static ssize_t (*pt_aix_sendfile_fptr)() = NULL; #error "Cannot determine architecture" #endif -static PRFileDesc *pt_SetMethods(PRIntn osfd, PRDescType type); +static PRFileDesc *pt_SetMethods( + PRIntn osfd, PRDescType type, PRBool isAcceptedSocket); static PRLock *_pr_flock_lock; /* For PR_LockFile() etc. */ static PRLock *_pr_rename_lock; /* For PR_Rename() */ @@ -153,13 +154,11 @@ static PRLock *_pr_rename_lock; /* For PR_Rename() */ /* These two functions are only used in assertions. */ #if defined(DEBUG) -static PRBool IsValidNetAddr(const PRNetAddr *addr) +PRBool IsValidNetAddr(const PRNetAddr *addr) { if ((addr != NULL) && (addr->raw.family != AF_UNIX) -#ifdef _PR_INET6 - && (addr->raw.family != AF_INET6) -#endif + && (addr->raw.family != PR_AF_INET6) && (addr->raw.family != AF_INET)) { return PR_FALSE; } @@ -190,7 +189,7 @@ static PRBool IsValidNetAddrLen(const PRNetAddr *addr, PRInt32 addr_len) * The polling interval defines the maximum amount of time that a thread * might hang up before an interrupt is noticed. */ -#define PT_DEFAULT_POLL_MSEC 100 +#define PT_DEFAULT_POLL_MSEC 5000 /* * pt_SockLen is the type for the length of a socket address @@ -217,17 +216,11 @@ typedef PRBool (*ContinuationFn)(pt_Continuation *op, PRInt16 revents); typedef enum pr_ContuationStatus { pt_continuation_pending, - pt_continuation_recycle, - pt_continuation_abort, pt_continuation_done } pr_ContuationStatus; struct pt_Continuation { - /* These objects are linked in ascending timeout order */ - pt_Continuation *next, *prev; /* self linked list of these things */ - - PRFileDesc *fd; /* The building of the continuation operation */ ContinuationFn function; /* what function to continue */ union { PRIntn osfd; } arg1; /* #1 - the op's fd */ @@ -258,7 +251,6 @@ struct pt_Continuation #endif /* HPUX11 */ PRIntervalTime timeout; /* client (relative) timeout */ - PRIntervalTime absolute; /* internal (absolute) timeout */ PRInt16 event; /* flags for poll()'s events */ @@ -271,27 +263,8 @@ struct pt_Continuation PRIntn syserrno; /* in case it failed, why (errno) */ pr_ContuationStatus status; /* the status of the operation */ - PRCondVar *complete; /* to notify the initiating thread */ - PRIntn io_tq_index; /* io-queue index */ }; -static struct pt_TimedQueue -{ - PRLock *ml; /* a little protection */ - PRThread *thread; /* internal thread's identification */ - PRUintn op_count; /* number of operations in the list */ - pt_Continuation *head, *tail; /* head/tail of list of operations */ - - pt_Continuation *op; /* timed operation furthest in future */ - struct pollfd *pollingList; /* list built for polling */ - PRIntn pollingSlotsAllocated; /* # entries available in list */ - pt_Continuation **pollingOps; /* list paralleling polling list */ -} *pt_tqp; /* an array */ - -static PRIntn _pt_num_cpus; -PRIntn _pt_tq_count; /* size of the pt_tqp array */ -static PRInt32 _pt_tq_index; /* atomically incremented */ - #if defined(DEBUG) PTDebug pt_debug; /* this is shared between several modules */ @@ -316,16 +289,6 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg) PR_fprintf( debug_out, "\tstarted: %s[%lld]\n", buffer, elapsed); PR_fprintf( - debug_out, "\tmissed predictions: %u\n", stats.predictionsFoiled); - PR_fprintf( - debug_out, "\tpollingList max: %u\n", stats.pollingListMax); - PR_fprintf( - debug_out, "\tcontinuations served: %u\n", stats.continuationsServed); - PR_fprintf( - debug_out, "\trecycles needed: %u\n", stats.recyclesNeeded); - PR_fprintf( - debug_out, "\tquiescent IO: %u\n", stats.quiescentIO); - PR_fprintf( debug_out, "\tlocks [created: %u, destroyed: %u]\n", stats.locks_created, stats.locks_destroyed); PR_fprintf( @@ -341,647 +304,143 @@ PR_IMPLEMENT(void) PT_FPrintStats(PRFileDesc *debug_out, const char *msg) #endif /* DEBUG */ -/* - * The following two functions, pt_InsertTimedInternal and - * pt_FinishTimedInternal, are always called with the tqp->ml - * lock held. The "internal" in the functions' names come from - * the Mesa programming language. Internal functions are always - * called from inside a monitor. - */ - -static void pt_InsertTimedInternal(pt_Continuation *op) -{ - pt_Continuation *t_op = NULL; - PRIntervalTime now = PR_IntervalNow(); - struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index]; - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PRThread *self = PR_GetCurrentThread(); - - PR_ASSERT(tqp == &pt_tqp[self->io_tq_index]); - PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL)); - PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0)); - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->head; tmp != NULL; tmp = tmp->next) - PR_ASSERT(tmp->io_tq_index == op->io_tq_index); - } -#endif /* defined(DEBUG) */ - - /* - * If this element operation isn't timed, it gets queued at the - * end of the list (just after tqp->tail) and we're - * finishd early. - */ - if (PR_INTERVAL_NO_TIMEOUT == op->timeout) - { - t_op = tqp->tail; /* put it at the end */ - goto done; - } - - /* - * The portion of this routine deals with timed ops. - */ - op->absolute = now + op->timeout; /* absolute ticks */ - if (NULL == tqp->op) tqp->op = op; - else - { - /* - * To find where in the list to put the new operation, based - * on the absolute time the operation in question will expire. - * - * The new operation ('op') will expire at now() + op->timeout. - * - * This should be easy! - */ - - for (t_op = tqp->op; NULL != t_op; t_op = t_op->prev) - { - /* - * If 'op' expires later than t_op, then insert 'op' just - * ahead of t_op. Otherwise, compute when operation[n-1] - * expires and try again. - * - * The actual difference between the expiriation of 'op' - * and the current operation what becomes the new operaton's - * timeout interval. That interval is also subtracted from - * the interval of the operation immediately following where - * we stick 'op' (unless the next one isn't timed). The new - * timeout assigned to 'op' takes into account the values of - * now() and when the previous intervals were computed. - */ - if ((PRInt32)(op->absolute - t_op->absolute) >= 0) - { - if (t_op == tqp->op) tqp->op = op; - break; - } - } - } - -done: - - /* - * Insert 'op' into the queue just after t_op or if t_op is null, - * at the head of the list. - * - * We need to set up the 'next' and 'prev' pointers of 'op' - * correctly before inserting 'op' into the queue. Also, we insert - * 'op' by updating tqp->head or op->prev->next first, and then - * updating op->next->prev. We want to make sure that the 'next' - * pointers are linked up correctly at all times so that we can - * traverse the queue by starting with tqp->head and following - * the 'next' pointers, without having to acquire the tqp->ml lock. - * (we do that in pt_ContinuationThreadInternal). We traverse the 'prev' - * pointers only in this function, which is called with the lock held. - * - * Similar care is taken in pt_FinishTimedInternal where we remove - * an op from the queue. - */ - if (NULL == t_op) - { - op->prev = NULL; - op->next = tqp->head; - tqp->head = op; - if (NULL == tqp->tail) tqp->tail = op; - else op->next->prev = op; - } - else - { - op->prev = t_op; - op->next = t_op->next; - if (NULL != op->prev) - op->prev->next = op; - if (NULL != op->next) - op->next->prev = op; - if (t_op == tqp->tail) - tqp->tail = op; - } - - tqp->op_count += 1; - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PR_ASSERT(tqp->head != NULL); - PR_ASSERT(tqp->tail != NULL); - PR_ASSERT(tqp->op_count != 0); - PR_ASSERT(tqp->head->prev == NULL); - PR_ASSERT(tqp->tail->next == NULL); - if (tqp->op_count > 1) - { - PR_ASSERT(tqp->head->next != NULL); - PR_ASSERT(tqp->tail->prev != NULL); - } - else - { - PR_ASSERT(tqp->head->next == NULL); - PR_ASSERT(tqp->tail->prev == NULL); - } - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - } -#endif /* defined(DEBUG) */ - -} /* pt_InsertTimedInternal */ - -/* - * function: pt_FinishTimedInternal - * - * Takes the finished operation out of the timed queue. It - * notifies the initiating thread that the opertions is - * complete and returns to the caller the value of the next - * operation in the list (or NULL). - */ -static pt_Continuation *pt_FinishTimedInternal(pt_Continuation *op) +static void pt_poll_now(pt_Continuation *op) { - pt_Continuation *next; - struct pt_TimedQueue *tqp = &pt_tqp[op->io_tq_index]; - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PR_ASSERT(tqp->head != NULL); - PR_ASSERT(tqp->tail != NULL); - PR_ASSERT(tqp->op_count != 0); - PR_ASSERT(tqp->head->prev == NULL); - PR_ASSERT(tqp->tail->next == NULL); - if (tqp->op_count > 1) - { - PR_ASSERT(tqp->head->next != NULL); - PR_ASSERT(tqp->tail->prev != NULL); - } - else - { - PR_ASSERT(tqp->head->next == NULL); - PR_ASSERT(tqp->tail->prev == NULL); - } - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - } -#endif /* defined(DEBUG) */ - - /* remove this one from the list */ - if (NULL == op->prev) tqp->head = op->next; - else op->prev->next = op->next; - if (NULL == op->next) tqp->tail = op->prev; - else op->next->prev = op->prev; - - /* did we happen to hit the timed op? */ - if (op == tqp->op) tqp->op = op->prev; - - next = op->next; - op->next = op->prev = NULL; - op->status = pt_continuation_done; - - tqp->op_count -= 1; - -#if defined(DEBUG) - pt_debug.continuationsServed += 1; -#endif - PR_NotifyCondVar(op->complete); - -#if defined(DEBUG) - { - PRIntn count; - pt_Continuation *tmp; - PR_ASSERT((tqp->head == NULL) == (tqp->tail == NULL)); - PR_ASSERT((tqp->head == NULL) == (tqp->op_count == 0)); - for (tmp = tqp->head, count = 0; tmp != NULL; tmp = tmp->next) count += 1; - PR_ASSERT(count == tqp->op_count); - for (tmp = tqp->tail, count = 0; tmp != NULL; tmp = tmp->prev) count += 1; - PR_ASSERT(count == tqp->op_count); - } -#endif /* defined(DEBUG) */ - - return next; -} /* pt_FinishTimedInternal */ - -static void pt_ContinuationThreadInternal(pt_Continuation *my_op) -{ - /* initialization */ - PRInt32 msecs, mx_poll_ticks; - PRThreadPriority priority; /* used to save caller's prio */ - PRIntn pollingListUsed; /* # entries used in the list */ - PRIntn pollingListNeeded; /* # entries needed this time */ - PRIntn io_tq_index = my_op->io_tq_index; - struct pt_TimedQueue *tqp = &pt_tqp[my_op->io_tq_index]; - struct pollfd *pollingList = tqp->pollingList; - PRIntn pollingSlotsAllocated = tqp->pollingSlotsAllocated; - pt_Continuation **pollingOps = tqp->pollingOps; - - PR_Unlock(tqp->ml); /* don't need that silly lock for a bit */ - - priority = PR_GetThreadPriority(tqp->thread); - PR_SetThreadPriority(tqp->thread, PR_PRIORITY_HIGH); - - mx_poll_ticks = (PRInt32)PR_MillisecondsToInterval(PT_DEFAULT_POLL_MSEC); - - /* do some real work */ - while (PR_TRUE) - { - PRIntn rv; - PRInt32 timeout; - PRIntn pollIndex; - PRIntervalTime now; - pt_Continuation *op, *next_op; - - PR_ASSERT(NULL != tqp->head); - - pollingListNeeded = tqp->op_count; - - /* - * We are not holding the tqp->ml lock now, so more items may - * get added to pt_tq during this window of time. We hope - * that 10 more spaces in the polling list should be enough. - * - * The space allocated is for both a vector that parallels the - * polling list, providing pointers directly into the operation's - * table and the polling list itself. There is a guard element - * between the two structures. - */ - pollingListNeeded += 10; - if (pollingListNeeded > pollingSlotsAllocated) - { - if (NULL != pollingOps) PR_Free(pollingOps); - pollingOps = (pt_Continuation**)PR_Malloc( - sizeof(pt_Continuation**) + pollingListNeeded * - (sizeof(struct pollfd) + sizeof(pt_Continuation*))); - PR_ASSERT(NULL != pollingOps); - tqp->pollingOps = pollingOps; - pollingSlotsAllocated = pollingListNeeded; - tqp->pollingSlotsAllocated = pollingSlotsAllocated; - pollingOps[pollingSlotsAllocated] = (pt_Continuation*)-1; - pollingList = (struct pollfd*)(&pollingOps[pollingSlotsAllocated + 1]); - tqp->pollingList = pollingList; - - } - -#if defined(DEBUG) - if (pollingListNeeded > pt_debug.pollingListMax) - pt_debug.pollingListMax = pollingListNeeded; -#endif - - /* - ** This is interrupt processing. If this thread was interrupted, - ** the thread state will have the PT_THREAD_ABORTED bit set. This - ** overrides good completions as well as timeouts. - ** - ** BTW, it does no good to hold the lock here. This lock doesn't - ** protect the thread structure in any way. Testing the bit and - ** (perhaps) resetting it are safe 'cause it's the only modifiable - ** bit in that word. - */ - if (_PT_THREAD_INTERRUPTED(tqp->thread)) - { - my_op->status = pt_continuation_abort; - tqp->thread->state &= ~PT_THREAD_ABORTED; - } - - - /* - * Build up a polling list. - * This list is sorted on time. Operations that have been - * interrupted are completed and not included in the list. - * There is an assertion that the operation is in progress. - */ - pollingListUsed = 0; - PR_Lock(tqp->ml); - - for (op = tqp->head; NULL != op;) - { - if (pt_continuation_abort == op->status) - { - op->result.code = -1; - op->syserrno = EINTR; - next_op = pt_FinishTimedInternal(op); - if (op == my_op) goto recycle; - else op = next_op; - PR_ASSERT(NULL != tqp->head); - } - else - { - op->status = pt_continuation_pending; - if (pollingListUsed >= pollingSlotsAllocated) - { -#if defined(DEBUG) - pt_debug.predictionsFoiled += 1; -#endif - break; - } - PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]); - /* - * eventMask bitmasks are declared as PRIntn so that - * each bitmask can be updated individually without - * disturbing adjacent memory, but only the lower 16 - * bits of a bitmask are used. - */ - op->fd->secret->eventMask[io_tq_index] = 0xffff; - pollingOps[pollingListUsed] = op; - pollingList[pollingListUsed].revents = 0; - pollingList[pollingListUsed].fd = op->arg1.osfd; - pollingList[pollingListUsed].events = op->event; - pollingListUsed += 1; - op = op->next; - } - } - - /* - * We don't want to wait forever on this poll. So keep - * the interval down. The operations, if they are timed, - * still have to timeout, while those that are not timed - * should persist forever. But they may be aborted. That's - * what this anxiety is all about. - */ - if (PR_INTERVAL_NO_TIMEOUT == tqp->head->timeout) - msecs = PT_DEFAULT_POLL_MSEC; - else - { - timeout = tqp->head->absolute - PR_IntervalNow(); - if (timeout <= 0) msecs = 0; /* already timed out */ - else if (timeout >= mx_poll_ticks) msecs = PT_DEFAULT_POLL_MSEC; - else msecs = (PRInt32)PR_IntervalToMilliseconds(timeout); - } - - PR_Unlock(tqp->ml); - - /* - * If 'op' isn't NULL at this point, then we didn't get to - * the end of the list. That means that more items got added - * to the list than we anticipated. So, forget this iteration, - * go around the horn again. - * - * One would hope this doesn't happen all that often. - */ - if (NULL != op) continue; /* make it rethink things */ - - PR_ASSERT((pt_Continuation*)-1 == pollingOps[pollingSlotsAllocated]); - - rv = poll(pollingList, pollingListUsed, msecs); - - if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) - continue; /* go around the loop again */ - - if (rv > 0) - { - /* - * poll() says that something in our list is ready for some more - * action. Find it, load up the operation and see what happens. - */ - - /* - * This may work out okay. The rule is that only this thread, - * the continuation thread, can remove elements from the list. - * Therefore, the list is at worst, longer than when we built - * the polling list. - */ - - for (pollIndex = 0; pollIndex < pollingListUsed; ++pollIndex) - { - PRInt16 events = pollingList[pollIndex].events; - PRInt16 revents = pollingList[pollIndex].revents; - - op = pollingOps[pollIndex]; /* this is the operation */ - - /* (ref: Bug #153459) - ** In case of POLLERR we let the operation retry in hope - ** of getting a more definitive OS error. - */ - if ((revents & POLLNVAL) /* busted in all cases */ - || ((events & POLLOUT) && (revents & POLLHUP))) /* write op & hup */ - { - PR_Lock(tqp->ml); - op->result.code = -1; - if (POLLNVAL & revents) op->syserrno = EBADF; - else if (POLLHUP & revents) op->syserrno = EPIPE; - (void)pt_FinishTimedInternal(op); - if (op == my_op) goto recycle; - PR_Unlock(tqp->ml); - } - else if ((0 != (revents & op->fd->secret->eventMask[io_tq_index])) - && (pt_continuation_pending == op->status)) - { - /* - * Only good?(?) revents left. Operations not pending - * will be pruned next time we build a list. This operation - * will be pruned if the continuation indicates it is - * finished. - */ - - if (op->function(op, revents)) - { - PR_Lock(tqp->ml); - (void)pt_FinishTimedInternal(op); - if (op == my_op) goto recycle; - PR_Unlock(tqp->ml); - } - else - { - /* - * If the continuation function returns - * PR_FALSE, it means available data have - * been read, output buffer space has been - * filled, or pending connections have been - * accepted by prior calls. If the - * continuation function is immediately - * invoked again, it will most likely - * return PR_FALSE. So turn off these - * events in the event mask for this fd so - * that if this fd is encountered again in - * the polling list with these events on, - * we won't invoke the continuation - * function again. - */ - op->fd->secret->eventMask[io_tq_index] &= ~revents; - } - } - } - } - - /* - * This is timeout processing. It is done after checking - * for good completions. Those that just made it under the - * wire are lucky, but none the less, valid. - */ - now = PR_IntervalNow(); - PR_Lock(tqp->ml); - while ((NULL != tqp->head) - && (PR_INTERVAL_NO_TIMEOUT != tqp->head->timeout)) - { - op = tqp->head; /* get a copy of this before finishing it */ - if ((PRInt32)(op->absolute - now) > 0) break; - /* - * The head element of the timed queue has timed out. Record - * the reason for completion and take it out of the list. - */ - op->result.code = -1; - op->syserrno = ETIMEDOUT; - (void)pt_FinishTimedInternal(op); - - /* - * If it's 'my_op' then we have to switch threads. Exit w/o - * finishing the scan. The scan will be completed when another - * thread calls in acting as the continuation thread. - */ - if (op == my_op) goto recycle; /* exit w/o unlocking */ - } - PR_Unlock(tqp->ml); - } - - PR_NOT_REACHED("This is a while(true) loop /w no breaks"); - -recycle: - /* - ** Recycling the continuation thread. - ** - ** The thread we were using for I/O continuations just completed - ** the I/O it submitted. It has to return to it's caller. We need - ** another thread to act in the continuation role. We can do that - ** by taking any operation off the timed queue, setting its state - ** to 'recycle' and notifying the condition. - ** - ** Selecting a likely thread seems like magic. I'm going to try - ** using one that has the longest (or no) timeout, tqp->tail. - ** If that slot's empty, then there's no outstanding I/O and we - ** don't need a thread at all. - ** - ** BTW, we're locked right now, and we'll be returning with the - ** the lock held as well. Seems odd, doesn't it? - */ - - /* $$$ should this be called with the lock held? $$$ */ - PR_SetThreadPriority(tqp->thread, priority); /* reset back to caller's */ - - PR_ASSERT((NULL == tqp->head) == (0 == tqp->op_count)); - PR_ASSERT((NULL == tqp->head) == (NULL == tqp->tail)); - PR_ASSERT(pt_continuation_done == my_op->status); + PRInt32 msecs; + PRIntervalTime epoch, now, elapsed, remaining; + PRThread *self = PR_GetCurrentThread(); - if (NULL != tqp->tail) - { - if (tqp->tail->status != pt_continuation_abort) - { - tqp->tail->status = pt_continuation_recycle; - } - PR_NotifyCondVar(tqp->tail->complete); -#if defined(DEBUG) - pt_debug.recyclesNeeded += 1; -#endif - } -#if defined(DEBUG) - else pt_debug.quiescentIO += 1; -#endif - -} /* pt_ContinuationThreadInternal */ + PR_ASSERT(PR_INTERVAL_NO_WAIT != op->timeout); + + switch (op->timeout) { + case PR_INTERVAL_NO_TIMEOUT: + msecs = PT_DEFAULT_POLL_MSEC; + do + { + PRIntn rv; + struct pollfd tmp_pfd; + + tmp_pfd.revents = 0; + tmp_pfd.fd = op->arg1.osfd; + tmp_pfd.events = op->event; + + rv = poll(&tmp_pfd, 1, msecs); + + if (self->state & PT_THREAD_ABORTED) + { + self->state &= ~PT_THREAD_ABORTED; + op->result.code = -1; + op->syserrno = EINTR; + op->status = pt_continuation_done; + return; + } + + if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) + continue; /* go around the loop again */ + + if (rv > 0) + { + PRInt16 events = tmp_pfd.events; + PRInt16 revents = tmp_pfd.revents; + + if ((revents & POLLNVAL) /* busted in all cases */ + || ((events & POLLOUT) && (revents & POLLHUP))) + /* write op & hup */ + { + op->result.code = -1; + if (POLLNVAL & revents) op->syserrno = EBADF; + else if (POLLHUP & revents) op->syserrno = EPIPE; + op->status = pt_continuation_done; + } else { + if (op->function(op, revents)) + op->status = pt_continuation_done; + } + } else if (rv == -1) { + op->result.code = -1; + op->syserrno = errno; + op->status = pt_continuation_done; + } + /* else, poll timed out */ + } while (pt_continuation_done != op->status); + break; + default: + now = epoch = PR_IntervalNow(); + remaining = op->timeout; + do + { + PRIntn rv; + struct pollfd tmp_pfd; + + tmp_pfd.revents = 0; + tmp_pfd.fd = op->arg1.osfd; + tmp_pfd.events = op->event; + + msecs = (PRInt32)PR_IntervalToMilliseconds(remaining); + if (msecs > PT_DEFAULT_POLL_MSEC) + msecs = PT_DEFAULT_POLL_MSEC; + rv = poll(&tmp_pfd, 1, msecs); + + if (self->state & PT_THREAD_ABORTED) + { + self->state &= ~PT_THREAD_ABORTED; + op->result.code = -1; + op->syserrno = EINTR; + op->status = pt_continuation_done; + return; + } + + if (rv > 0) + { + PRInt16 events = tmp_pfd.events; + PRInt16 revents = tmp_pfd.revents; + + if ((revents & POLLNVAL) /* busted in all cases */ + || ((events & POLLOUT) && (revents & POLLHUP))) + /* write op & hup */ + { + op->result.code = -1; + if (POLLNVAL & revents) op->syserrno = EBADF; + else if (POLLHUP & revents) op->syserrno = EPIPE; + op->status = pt_continuation_done; + } else { + if (op->function(op, revents)) + { + op->status = pt_continuation_done; + } + } + } else if ((rv == 0) || + ((errno == EINTR) || (errno == EAGAIN))) { + if (rv == 0) /* poll timed out */ + now += PR_MillisecondsToInterval(msecs); + else + now = PR_IntervalNow(); + elapsed = (PRIntervalTime) (now - epoch); + if (elapsed >= op->timeout) { + op->result.code = -1; + op->syserrno = ETIMEDOUT; + op->status = pt_continuation_done; + } else + remaining = op->timeout - elapsed; + } else { + op->result.code = -1; + op->syserrno = errno; + op->status = pt_continuation_done; + } + } while (pt_continuation_done != op->status); + break; + } + +} /* pt_poll_now */ static PRIntn pt_Continue(pt_Continuation *op) { - PRStatus rv; - PRThread *self = PR_GetCurrentThread(); - struct pt_TimedQueue *tqp; - - /* lazy assignment of the thread's ioq */ - if (-1 == self->io_tq_index) - { - self->io_tq_index = (PR_AtomicIncrement(&_pt_tq_index)-1) % _pt_tq_count; - } - - PR_ASSERT(self->io_tq_index >= 0); - tqp = &pt_tqp[self->io_tq_index]; - - /* lazy allocation of the thread's cv */ - if (NULL == self->io_cv) - self->io_cv = PR_NewCondVar(tqp->ml); - /* Finish filling in the blank slots */ - op->complete = self->io_cv; op->status = pt_continuation_pending; /* set default value */ - op->io_tq_index = self->io_tq_index; - PR_Lock(tqp->ml); /* we provide the locking */ - - pt_InsertTimedInternal(op); /* insert in the structure */ - - /* - ** At this point, we try to decide whether there is a continuation - ** thread, or whether we should assign this one to serve in that role. - */ - do - { - if (NULL == tqp->thread) - { - /* - ** We're the one. Call the processing function with the lock - ** held. It will return with it held as well, though there - ** will certainly be times within the function when it gets - ** released. - */ - tqp->thread = self; /* I'm taking control */ - pt_ContinuationThreadInternal(op); /* go slash and burn */ - PR_ASSERT(pt_continuation_done == op->status); - tqp->thread = NULL; /* I'm abdicating my rule */ - } - else - { - rv = PR_WaitCondVar(op->complete, PR_INTERVAL_NO_TIMEOUT); - /* - * If we get interrupted, we set state the continuation thread will - * see and allow it to finish the I/O operation w/ error. That way - * the rule that only the continuation thread is removing elements - * from the list is still valid. - * - * Don't call interrupt on the continuation thread. That'll just - * irritate him. He's cycling around at least every mx_poll_ticks - * anyhow and should notice the request in there. When he does - * notice, this operation will be finished and the op's status - * marked as pt_continuation_done. - */ - if ((PR_FAILURE == rv) /* the wait was interrupted */ - && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) - { - if (pt_continuation_done == op->status) - { - /* - * The op is done and has been removed - * from the timed queue. We must not - * change op->status, otherwise this - * thread will go around the loop again. - * - * It's harsh to mark the op failed with - * interrupt error when the io is already - * done, but we should indicate the fact - * that the thread was interrupted. So - * we set the aborted flag to abort the - * thread's next blocking call. Is this - * the right thing to do? - */ - self->state |= PT_THREAD_ABORTED; - } - else - { - /* go around the loop again */ - op->status = pt_continuation_abort; - } - } - /* - * If we're to recycle, continue within this loop. This will - * cause this thread to become the continuation thread. - */ - - } - } while (pt_continuation_done != op->status); - - - PR_Unlock(tqp->ml); /* we provided the locking */ - - return op->result.code; /* and the primary answer */ + /* + * let each thread call poll directly + */ + pt_poll_now(op); + PR_ASSERT(pt_continuation_done == op->status); + return op->result.code; } /* pt_Continue */ /*****************************************************************************/ @@ -1290,52 +749,8 @@ static PRBool pt_hpux_sendfile_cont(pt_Continuation *op, PRInt16 revents) } #endif /* HPUX11 */ -#define _MD_CPUS_ONLINE 2 - void _PR_InitIO() { - PRIntn index; - char *num_io_queues; - - if (NULL != (num_io_queues = getenv("NSPR_NUM_IO_QUEUES"))) - { - _pt_tq_count = atoi(num_io_queues); - } - else - { - /* - * Get the number of CPUs if the pthread - * library has kernel-scheduled entities that - * can run on multiple CPUs. - */ -#ifdef HPUX11 - _pt_num_cpus = pthread_num_processors_np(); -#elif defined(IRIX) || defined(OSF1) - _pt_num_cpus = sysconf(_SC_NPROC_ONLN); -#elif defined(AIX) || defined(LINUX) || defined(SOLARIS) - _pt_num_cpus = sysconf(_SC_NPROCESSORS_ONLN); -#else - /* - * A pure user-level (Mx1) pthread library can - * only use one CPU, even on a multiprocessor. - */ - _pt_num_cpus = 1; -#endif - if (_pt_num_cpus < 0) - _pt_num_cpus = _MD_CPUS_ONLINE; - _pt_tq_count = _pt_num_cpus; - } - - pt_tqp = (struct pt_TimedQueue *) - PR_CALLOC(_pt_tq_count * sizeof(struct pt_TimedQueue)); - PR_ASSERT(NULL != pt_tqp); - - for (index = 0; index < _pt_tq_count; index++) - { - pt_tqp[index].ml = PR_NewLock(); - PR_ASSERT(NULL != pt_tqp[index].ml); - } - #if defined(DEBUG) memset(&pt_debug, 0, sizeof(PTDebug)); pt_debug.timeStarted = PR_Now(); @@ -1348,10 +763,11 @@ void _PR_InitIO() _PR_InitFdCache(); /* do that */ - _pr_stdin = pt_SetMethods(0, PR_DESC_FILE); - _pr_stdout = pt_SetMethods(1, PR_DESC_FILE); - _pr_stderr = pt_SetMethods(2, PR_DESC_FILE); + _pr_stdin = pt_SetMethods(0, PR_DESC_FILE, PR_FALSE); + _pr_stdout = pt_SetMethods(1, PR_DESC_FILE, PR_FALSE); + _pr_stderr = pt_SetMethods(2, PR_DESC_FILE, PR_FALSE); PR_ASSERT(_pr_stdin && _pr_stdout && _pr_stderr); + } /* _PR_InitIO */ PR_IMPLEMENT(PRFileDesc*) PR_GetSpecialFD(PRSpecialFD osfd) @@ -1438,7 +854,6 @@ static PRInt32 pt_Read(PRFileDesc *fd, void *buf, PRInt32 amount) && (!fd->secret->nonblocking)) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -1479,7 +894,6 @@ static PRInt32 pt_Write(PRFileDesc *fd, const void *buf, PRInt32 amount) if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -1573,7 +987,6 @@ static PRInt32 pt_Writev( { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)osiov; op.arg3.amount = osiov_len; @@ -1690,7 +1103,9 @@ static PRStatus pt_Connect( { PRIntn rv = -1, syserrno; pt_SockLen addr_len; -#ifdef _PR_HAVE_SOCKADDR_LEN + const PRNetAddr *addrp = addr; +#if defined(_PR_HAVE_SOCKADDR_LEN) || defined(_PR_INET6) + PRUint16 md_af = addr->raw.family; PRNetAddr addrCopy; #endif @@ -1698,13 +1113,24 @@ static PRStatus pt_Connect( PR_ASSERT(IsValidNetAddr(addr) == PR_TRUE); addr_len = PR_NETADDR_SIZE(addr); +#if defined(_PR_INET6) + if (addr->raw.family == PR_AF_INET6) { + md_af = AF_INET6; +#ifndef _PR_HAVE_SOCKADDR_LEN + addrCopy = *addr; + addrCopy.raw.family = AF_INET6; + addrp = &addrCopy; +#endif + } +#endif + #ifdef _PR_HAVE_SOCKADDR_LEN addrCopy = *addr; ((struct sockaddr*)&addrCopy)->sa_len = addr_len; - ((struct sockaddr*)&addrCopy)->sa_family = addr->raw.family; + ((struct sockaddr*)&addrCopy)->sa_family = md_af; rv = connect(fd->secret->md.osfd, (struct sockaddr*)&addrCopy, addr_len); #else - rv = connect(fd->secret->md.osfd, (struct sockaddr*)addr, addr_len); + rv = connect(fd->secret->md.osfd, (struct sockaddr*)addrp, addr_len); #endif syserrno = errno; if ((-1 == rv) && (EINPROGRESS == syserrno) && (!fd->secret->nonblocking)) @@ -1713,7 +1139,6 @@ static PRStatus pt_Connect( else { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; #ifdef _PR_HAVE_SOCKADDR_LEN op.arg2.buffer = (void*)&addrCopy; @@ -1788,7 +1213,6 @@ static PRFileDesc* pt_Accept( else { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = addr; op.arg3.addr_len = &addr_len; @@ -1808,7 +1232,11 @@ static PRFileDesc* pt_Accept( addr->raw.family = ((struct sockaddr*)addr)->sa_family; } #endif /* _PR_HAVE_SOCKADDR_LEN */ - newfd = pt_SetMethods(osfd, PR_DESC_SOCKET_TCP); +#ifdef _PR_INET6 + if (addr && (AF_INET6 == addr->raw.family)) + addr->raw.family = PR_AF_INET6; +#endif + newfd = pt_SetMethods(osfd, PR_DESC_SOCKET_TCP, PR_TRUE); if (newfd == NULL) close(osfd); /* $$$ whoops! this doesn't work $$$ */ else { @@ -1826,14 +1254,15 @@ static PRStatus pt_Bind(PRFileDesc *fd, const PRNetAddr *addr) { PRIntn rv; pt_SockLen addr_len; -#ifdef _PR_HAVE_SOCKADDR_LEN + const PRNetAddr *addrp = addr; +#if defined(_PR_HAVE_SOCKADDR_LEN) || defined(_PR_INET6) + PRUint16 md_af = addr->raw.family; PRNetAddr addrCopy; #endif if (pt_TestAbort()) return PR_FAILURE; PR_ASSERT(IsValidNetAddr(addr) == PR_TRUE); - if (addr->raw.family == AF_UNIX) { /* Disallow relative pathnames */ @@ -1844,14 +1273,25 @@ static PRStatus pt_Bind(PRFileDesc *fd, const PRNetAddr *addr) } } +#if defined(_PR_INET6) + if (addr->raw.family == PR_AF_INET6) { + md_af = AF_INET6; +#ifndef _PR_HAVE_SOCKADDR_LEN + addrCopy = *addr; + addrCopy.raw.family = AF_INET6; + addrp = &addrCopy; +#endif + } +#endif + addr_len = PR_NETADDR_SIZE(addr); #ifdef _PR_HAVE_SOCKADDR_LEN addrCopy = *addr; ((struct sockaddr*)&addrCopy)->sa_len = addr_len; - ((struct sockaddr*)&addrCopy)->sa_family = addr->raw.family; + ((struct sockaddr*)&addrCopy)->sa_family = md_af; rv = bind(fd->secret->md.osfd, (struct sockaddr*)&addrCopy, addr_len); #else - rv = bind(fd->secret->md.osfd, (struct sockaddr*)addr, addr_len); + rv = bind(fd->secret->md.osfd, (struct sockaddr*)addrp, addr_len); #endif if (rv == -1) { @@ -1919,7 +1359,6 @@ static PRInt32 pt_Recv( else { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -2004,7 +1443,6 @@ static PRInt32 pt_Send( if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -2034,25 +1472,38 @@ static PRInt32 pt_SendTo( PRInt32 syserrno, bytes = -1; PRBool fNeedContinue = PR_FALSE; pt_SockLen addr_len; -#ifdef _PR_HAVE_SOCKADDR_LEN + const PRNetAddr *addrp = addr; +#if defined(_PR_HAVE_SOCKADDR_LEN) || defined(_PR_INET6) + PRUint16 md_af = addr->raw.family; PRNetAddr addrCopy; #endif if (pt_TestAbort()) return bytes; PR_ASSERT(IsValidNetAddr(addr) == PR_TRUE); +#if defined(_PR_INET6) + if (addr->raw.family == PR_AF_INET6) { + md_af = AF_INET6; +#ifndef _PR_HAVE_SOCKADDR_LEN + addrCopy = *addr; + addrCopy.raw.family = AF_INET6; + addrp = &addrCopy; +#endif + } +#endif + addr_len = PR_NETADDR_SIZE(addr); #ifdef _PR_HAVE_SOCKADDR_LEN addrCopy = *addr; ((struct sockaddr*)&addrCopy)->sa_len = addr_len; - ((struct sockaddr*)&addrCopy)->sa_family = addr->raw.family; + ((struct sockaddr*)&addrCopy)->sa_family = md_af; bytes = sendto( fd->secret->md.osfd, buf, amount, flags, (struct sockaddr*)&addrCopy, addr_len); #else bytes = sendto( fd->secret->md.osfd, buf, amount, flags, - (struct sockaddr*)addr, addr_len); + (struct sockaddr*)addrp, addr_len); #endif syserrno = errno; if ( (bytes == -1) && (syserrno == EWOULDBLOCK || syserrno == EAGAIN) @@ -2064,7 +1515,6 @@ static PRInt32 pt_SendTo( if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = (void*)buf; op.arg3.amount = amount; @@ -2110,7 +1560,6 @@ static PRInt32 pt_RecvFrom(PRFileDesc *fd, void *buf, PRInt32 amount, if (fNeedContinue == PR_TRUE) { pt_Continuation op; - op.fd = fd; op.arg1.osfd = fd->secret->md.osfd; op.arg2.buffer = buf; op.arg3.amount = amount; @@ -2132,6 +1581,10 @@ static PRInt32 pt_RecvFrom(PRFileDesc *fd, void *buf, PRInt32 amount, } } #endif /* _PR_HAVE_SOCKADDR_LEN */ +#ifdef _PR_INET6 + if (addr && (AF_INET6 == addr->raw.family)) + addr->raw.family = PR_AF_INET6; +#endif if (bytes < 0) pt_MapError(_PR_MD_MAP_RECVFROM_ERROR, syserrno); return bytes; @@ -2238,7 +1691,6 @@ static PRInt32 pt_AIXSendFile(PRFileDesc *sd, PRSendFileData *sfd, if ((rv == 1) || ((rv == -1) && (count == 0))) { pt_Continuation op; - op.fd = sd; op.arg1.osfd = sd->secret->md.osfd; op.arg2.buffer = &sf_struct; op.arg4.flags = send_flags; @@ -2357,7 +1809,6 @@ static PRInt32 pt_HPUXSendFile(PRFileDesc *sd, PRSendFileData *sfd, hdtrl[1].iov_len = sfd->tlen - trailer_nbytes_sent; } - op.fd = sd; op.arg1.osfd = sd->secret->md.osfd; op.filedesc = sfd->fd->secret->md.osfd; op.arg2.buffer = hdtrl; @@ -2503,6 +1954,10 @@ static PRStatus pt_GetSockName(PRFileDesc *fd, PRNetAddr *addr) addr->raw.family = ((struct sockaddr*)addr)->sa_family; } #endif /* _PR_HAVE_SOCKADDR_LEN */ +#ifdef _PR_INET6 + if (AF_INET6 == addr->raw.family) + addr->raw.family = PR_AF_INET6; +#endif PR_ASSERT(IsValidNetAddr(addr) == PR_TRUE); PR_ASSERT(IsValidNetAddrLen(addr, addr_len) == PR_TRUE); return PR_SUCCESS; @@ -2530,113 +1985,16 @@ static PRStatus pt_GetPeerName(PRFileDesc *fd, PRNetAddr *addr) addr->raw.family = ((struct sockaddr*)addr)->sa_family; } #endif /* _PR_HAVE_SOCKADDR_LEN */ +#ifdef _PR_INET6 + if (AF_INET6 == addr->raw.family) + addr->raw.family = PR_AF_INET6; +#endif PR_ASSERT(IsValidNetAddr(addr) == PR_TRUE); PR_ASSERT(IsValidNetAddrLen(addr, addr_len) == PR_TRUE); return PR_SUCCESS; } } /* pt_GetPeerName */ -static PRStatus pt_GetSockOpt( - PRFileDesc *fd, PRSockOption optname, void* optval, PRInt32* optlen) -{ - PRIntn rv = -1; - PRInt32 level, name; - - if (pt_TestAbort()) return PR_FAILURE; - - /* - * PR_SockOpt_Nonblocking is a special case that does not - * translate to a getsockopt() call. - */ - if (PR_SockOpt_Nonblocking == optname) - { - PR_ASSERT(sizeof(PRIntn) <= *optlen); - *((PRIntn *) optval) = (PRIntn) fd->secret->nonblocking; - *optlen = sizeof(PRIntn); - return PR_SUCCESS; - } - - rv = _PR_MapOptionName(optname, &level, &name); - if (0 == rv) - { - if (PR_SockOpt_Linger == optname) - { - struct linger linger; - pt_SockLen len = sizeof(linger); - rv = getsockopt(fd->secret->md.osfd, level, name, - (char *) &linger, &len); - if (0 == rv) - { - ((PRLinger*)(optval))->polarity = linger.l_onoff - ? PR_TRUE : PR_FALSE; - ((PRLinger*)(optval))->linger = PR_SecondsToInterval( - linger.l_linger); - *optlen = sizeof(PRLinger); - } - } - else - { - /* Make sure the pointer type cast below is safe */ - PR_ASSERT(sizeof(PRInt32) == sizeof(PRIntn)); - rv = getsockopt(fd->secret->md.osfd, level, name, - optval, (pt_SockLen*)optlen); - } - } - - if (rv == -1) { - pt_MapError(_PR_MD_MAP_GETSOCKOPT_ERROR, errno); - return PR_FAILURE; - } else { - return PR_SUCCESS; - } -} /* pt_GetSockOpt */ - -static PRStatus pt_SetSockOpt( - PRFileDesc *fd, PRSockOption optname, const void* optval, PRInt32 optlen) -{ - PRIntn rv = -1; - PRInt32 level, name; - - if (pt_TestAbort()) return PR_FAILURE; - - /* - * PR_SockOpt_Nonblocking is a special case that does not - * translate to a setsockopt() call. - */ - if (PR_SockOpt_Nonblocking == optname) - { - PR_ASSERT(sizeof(PRIntn) == optlen); - fd->secret->nonblocking = *((PRIntn *) optval) ? PR_TRUE : PR_FALSE; - return PR_SUCCESS; - } - - rv = _PR_MapOptionName(optname, &level, &name); - if (0 == rv) - { - if (PR_SockOpt_Linger == optname) - { - struct linger linger; - linger.l_onoff = ((PRLinger*)(optval))->polarity; - linger.l_linger = PR_IntervalToSeconds( - ((PRLinger*)(optval))->linger); - rv = setsockopt(fd->secret->md.osfd, level, name, - (char *) &linger, sizeof(linger)); - } - else - { - rv = setsockopt(fd->secret->md.osfd, level, name, - optval, optlen); - } - } - - if (rv == -1) { - pt_MapError(_PR_MD_MAP_SETSOCKOPT_ERROR, errno); - return PR_FAILURE; - } else { - return PR_SUCCESS; - } -} /* pt_SetSockOpt */ - static PRStatus pt_GetSocketOption(PRFileDesc *fd, PRSocketOptionData *data) { PRIntn rv; @@ -2896,8 +2254,8 @@ static PRIOMethods _pr_file_methods = { (PRTransmitfileFN)_PR_InvalidInt, (PRGetsocknameFN)_PR_InvalidStatus, (PRGetpeernameFN)_PR_InvalidStatus, - (PRGetsockoptFN)_PR_InvalidStatus, - (PRSetsockoptFN)_PR_InvalidStatus, + (PRReservedFN)_PR_InvalidInt, + (PRReservedFN)_PR_InvalidInt, (PRGetsocketoptionFN)_PR_InvalidStatus, (PRSetsocketoptionFN)_PR_InvalidStatus, (PRSendfileFN)_PR_InvalidInt, @@ -2935,8 +2293,8 @@ static PRIOMethods _pr_pipe_methods = { (PRTransmitfileFN)_PR_InvalidInt, (PRGetsocknameFN)_PR_InvalidStatus, (PRGetpeernameFN)_PR_InvalidStatus, - (PRGetsockoptFN)_PR_InvalidStatus, - (PRSetsockoptFN)_PR_InvalidStatus, + (PRReservedFN)_PR_InvalidInt, + (PRReservedFN)_PR_InvalidInt, (PRGetsocketoptionFN)_PR_InvalidStatus, (PRSetsocketoptionFN)_PR_InvalidStatus, (PRSendfileFN)_PR_InvalidInt, @@ -2974,8 +2332,8 @@ static PRIOMethods _pr_tcp_methods = { pt_TransmitFile, pt_GetSockName, pt_GetPeerName, - pt_GetSockOpt, - pt_SetSockOpt, + (PRReservedFN)_PR_InvalidInt, + (PRReservedFN)_PR_InvalidInt, pt_GetSocketOption, pt_SetSocketOption, pt_SendFile, @@ -3013,8 +2371,8 @@ static PRIOMethods _pr_udp_methods = { (PRTransmitfileFN)_PR_InvalidInt, pt_GetSockName, pt_GetPeerName, - pt_GetSockOpt, - pt_SetSockOpt, + (PRReservedFN)_PR_InvalidInt, + (PRReservedFN)_PR_InvalidInt, pt_GetSocketOption, pt_SetSocketOption, (PRSendfileFN)_PR_InvalidInt, @@ -3025,7 +2383,6 @@ static PRIOMethods _pr_udp_methods = { (PRReservedFN)_PR_InvalidInt }; - static PRIOMethods _pr_socketpollfd_methods = { (PRDescType) 0, (PRCloseFN)_PR_InvalidStatus, @@ -3053,8 +2410,8 @@ static PRIOMethods _pr_socketpollfd_methods = { (PRTransmitfileFN)_PR_InvalidInt, (PRGetsocknameFN)_PR_InvalidStatus, (PRGetpeernameFN)_PR_InvalidStatus, - (PRGetsockoptFN)_PR_InvalidStatus, - (PRSetsockoptFN)_PR_InvalidStatus, + (PRReservedFN)_PR_InvalidInt, + (PRReservedFN)_PR_InvalidInt, (PRGetsocketoptionFN)_PR_InvalidStatus, (PRSetsocketoptionFN)_PR_InvalidStatus, (PRSendfileFN)_PR_InvalidInt, @@ -3065,10 +2422,6 @@ static PRIOMethods _pr_socketpollfd_methods = { (PRReservedFN)_PR_InvalidInt }; -#if defined(_PR_FCNTL_FLAGS) -#undef _PR_FCNTL_FLAGS -#endif - #if defined(HPUX) || defined(OSF1) || defined(SOLARIS) || defined (IRIX) \ || defined(AIX) || defined(LINUX) || defined(FREEBSD) || defined(NETBSD) \ || defined(OPENBSD) || defined(BSDI) || defined(VMS) || defined(NTO) @@ -3077,9 +2430,58 @@ static PRIOMethods _pr_socketpollfd_methods = { #error "Can't determine architecture" #endif -static PRFileDesc *pt_SetMethods(PRIntn osfd, PRDescType type) +/* + * Put a Unix file descriptor in non-blocking mode. + */ +static void pt_MakeFdNonblock(PRIntn osfd) +{ + PRIntn flags; + flags = fcntl(osfd, F_GETFL, 0); + flags |= _PR_FCNTL_FLAGS; + (void)fcntl(osfd, F_SETFL, flags); +} + +/* + * Put a Unix socket fd in non-blocking mode that can + * ideally be inherited by an accepted socket. + * + * Why doesn't pt_MakeFdNonblock do? This is to deal with + * the special case of HP-UX. HP-UX has three kinds of + * non-blocking modes for sockets: the fcntl() O_NONBLOCK + * and O_NDELAY flags and ioctl() FIOSNBIO request. Only + * the ioctl() FIOSNBIO form of non-blocking mode is + * inherited by an accepted socket. + * + * Other platforms just use the generic pt_MakeFdNonblock + * to put a socket in non-blocking mode. + */ +#ifdef HPUX +static void pt_MakeSocketNonblock(PRIntn osfd) +{ + PRIntn one = 1; + (void)ioctl(osfd, FIOSNBIO, &one); +} +#else +#define pt_MakeSocketNonblock pt_MakeFdNonblock +#endif + +#ifdef DEBUG +static void pt_AssertCloseOnExecIsCleared(PRIntn osfd) +{ + /* + * Ignore EBADF error on fd's 0, 1, 2 because they are + * not open in all processes. + */ + PRIntn flags; + flags = fcntl(osfd, F_GETFD, 0); + PR_ASSERT((0 == flags) || (-1 == flags + && (0 <= osfd && osfd <= 2) && errno == EBADF)); +} +#endif + +static PRFileDesc *pt_SetMethods( + PRIntn osfd, PRDescType type, PRBool isAcceptedSocket) { - PRInt32 flags; PRFileDesc *fd = _PR_Getfd(); if (fd == NULL) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); @@ -3089,13 +2491,7 @@ static PRFileDesc *pt_SetMethods(PRIntn osfd, PRDescType type) fd->secret->state = _PR_FILEDESC_OPEN; /* By default, a Unix fd is not closed on exec. */ #ifdef DEBUG - /* - * Ignore EBADF error on fd's 0, 1, 2 because they are - * not open in all processes. - */ - flags = fcntl(osfd, F_GETFD, 0); - PR_ASSERT((0 == flags) || (-1 == flags - && (0 <= osfd && osfd <= 2) && errno == EBADF)); + pt_AssertCloseOnExecIsCleared(osfd); #endif fd->secret->inheritable = PR_TRUE; switch (type) @@ -3105,18 +2501,19 @@ static PRFileDesc *pt_SetMethods(PRIntn osfd, PRDescType type) break; case PR_DESC_SOCKET_TCP: fd->methods = PR_GetTCPMethods(); - flags = fcntl(osfd, F_GETFL, 0); - flags |= _PR_FCNTL_FLAGS; - (void)fcntl(osfd, F_SETFL, flags); +#ifdef _PR_ACCEPT_INHERIT_NONBLOCK + if (!isAcceptedSocket) pt_MakeSocketNonblock(osfd); +#else + pt_MakeSocketNonblock(osfd); +#endif break; case PR_DESC_SOCKET_UDP: fd->methods = PR_GetUDPMethods(); - flags = fcntl(osfd, F_GETFL, 0); - flags |= _PR_FCNTL_FLAGS; - (void)fcntl(osfd, F_SETFL, flags); + pt_MakeFdNonblock(osfd); break; case PR_DESC_PIPE: fd->methods = PR_GetPipeMethods(); + pt_MakeFdNonblock(osfd); break; default: break; @@ -3168,13 +2565,14 @@ PR_IMPLEMENT(PRFileDesc*) PR_AllocFileDesc( if (osfd > 2) { /* Don't mess around with stdin, stdout or stderr */ - PRIntn flags; - flags = fcntl(osfd, F_GETFL, 0); - fcntl(osfd, F_SETFL, flags | _PR_FCNTL_FLAGS); + if (&_pr_tcp_methods == methods) pt_MakeSocketNonblock(osfd); + else pt_MakeFdNonblock(osfd); } fd->secret->state = _PR_FILEDESC_OPEN; /* By default, a Unix fd is not closed on exec. */ - PR_ASSERT(0 == fcntl(osfd, F_GETFD, 0)); +#ifdef DEBUG + pt_AssertCloseOnExecIsCleared(osfd); +#endif fd->secret->inheritable = PR_TRUE; return fd; @@ -3183,40 +2581,91 @@ failed: return fd; } /* PR_AllocFileDesc */ +#if !defined(_PR_INET6) || defined(_PR_INET6_PROBE) +PR_EXTERN(PRStatus) _pr_push_ipv6toipv4_layer(PRFileDesc *fd); +#if defined(_PR_INET6_PROBE) +PR_EXTERN(PRBool) _pr_ipv6_is_present; +PR_IMPLEMENT(PRBool) _pr_test_ipv6_socket() +{ +PRInt32 osfd; + + osfd = socket(AF_INET6, SOCK_STREAM, 0); + if (osfd != -1) { + close(osfd); + return PR_TRUE; + } + return PR_FALSE; +} +#endif /* _PR_INET6_PROBE */ +#endif + PR_IMPLEMENT(PRFileDesc*) PR_Socket(PRInt32 domain, PRInt32 type, PRInt32 proto) { PRIntn osfd; PRDescType ftype; PRFileDesc *fd = NULL; + PRInt32 tmp_domain = domain; if (!_pr_initialized) _PR_ImplicitInitialization(); if (pt_TestAbort()) return NULL; if (PF_INET != domain -#if defined(_PR_INET6) - && PF_INET6 != domain -#endif + && PR_AF_INET6 != domain && PF_UNIX != domain) { PR_SetError(PR_ADDRESS_NOT_SUPPORTED_ERROR, 0); return fd; } - if (type == SOCK_STREAM) ftype = PR_DESC_SOCKET_TCP; - else if (type == SOCK_DGRAM) ftype = PR_DESC_SOCKET_UDP; - else - { - (void)PR_SetError(PR_ADDRESS_NOT_SUPPORTED_ERROR, 0); - return fd; - } + if (type == SOCK_STREAM) ftype = PR_DESC_SOCKET_TCP; + else if (type == SOCK_DGRAM) ftype = PR_DESC_SOCKET_UDP; + else + { + (void)PR_SetError(PR_ADDRESS_NOT_SUPPORTED_ERROR, 0); + return fd; + } +#if defined(_PR_INET6) + if (PR_AF_INET6 == domain) { +#if defined(_PR_INET6_PROBE) + if (_pr_ipv6_is_present == PR_FALSE) + domain = AF_INET; + else +#endif + domain = AF_INET6; + } +#elif defined(_PR_INET6_PROBE) + if (PR_AF_INET6 == domain) { + if (_pr_ipv6_is_present == PR_FALSE) + domain = AF_INET; + else + domain = AF_INET6; + } +#else + if (PR_AF_INET6 == domain) + domain = AF_INET; +#endif osfd = socket(domain, type, proto); if (osfd == -1) pt_MapError(_PR_MD_MAP_SOCKET_ERROR, errno); else { - fd = pt_SetMethods(osfd, ftype); + fd = pt_SetMethods(osfd, ftype, PR_FALSE); if (fd == NULL) close(osfd); } +#if !defined(_PR_INET6) || defined(_PR_INET6_PROBE) + if (fd != NULL) { + /* + * For platforms with no support for IPv6 + * create layered socket for IPv4-mapped IPv6 addresses + */ + if (PR_AF_INET6 == tmp_domain && PR_AF_INET == domain) { + if (PR_FAILURE == _pr_push_ipv6toipv4_layer(fd)) { + PR_Close(fd); + fd = NULL; + } + } + } +#endif return fd; } /* PR_Socket */ @@ -3224,7 +2673,8 @@ PR_IMPLEMENT(PRFileDesc*) PR_Socket(PRInt32 domain, PRInt32 type, PRInt32 proto) /****************************** I/O public methods ***************************/ /*****************************************************************************/ -PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode) +PR_IMPLEMENT(PRFileDesc*) PR_OpenFile( + const char *name, PRIntn flags, PRIntn mode) { PRFileDesc *fd = NULL; PRIntn syserrno, osfd = -1, osflags = 0;; @@ -3238,6 +2688,17 @@ PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode) if (flags & PR_RDWR) osflags |= O_RDWR; if (flags & PR_APPEND) osflags |= O_APPEND; if (flags & PR_TRUNCATE) osflags |= O_TRUNC; + if (flags & PR_EXCL) osflags |= O_EXCL; + if (flags & PR_SYNC) + { +#if defined(O_SYNC) + osflags |= O_SYNC; +#elif defined(O_FSYNC) + osflags |= O_FSYNC; +#else +#error "Neither O_SYNC nor O_FSYNC is defined on this platform" +#endif + } /* ** We have to hold the lock across the creation in order to @@ -3261,10 +2722,15 @@ PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode) pt_MapError(_PR_MD_MAP_OPEN_ERROR, syserrno); else { - fd = pt_SetMethods(osfd, PR_DESC_FILE); + fd = pt_SetMethods(osfd, PR_DESC_FILE, PR_FALSE); if (fd == NULL) close(osfd); /* $$$ whoops! this is bad $$$ */ } return fd; +} /* PR_OpenFile */ + +PR_IMPLEMENT(PRFileDesc*) PR_Open(const char *name, PRIntn flags, PRIntn mode) +{ + return PR_OpenFile(name, flags, mode); } /* PR_Open */ PR_IMPLEMENT(PRStatus) PR_Delete(const char *name) @@ -3371,7 +2837,7 @@ PR_IMPLEMENT(PRStatus) PR_CloseDir(PRDir *dir) return PR_SUCCESS; } /* PR_CloseDir */ -PR_IMPLEMENT(PRStatus) PR_MkDir(const char *name, PRIntn mode) +PR_IMPLEMENT(PRStatus) PR_MakeDir(const char *name, PRIntn mode) { PRInt32 rv = -1; @@ -3390,6 +2856,11 @@ PR_IMPLEMENT(PRStatus) PR_MkDir(const char *name, PRIntn mode) PR_Unlock(_pr_rename_lock); return (-1 == rv) ? PR_FAILURE : PR_SUCCESS; +} /* PR_Makedir */ + +PR_IMPLEMENT(PRStatus) PR_MkDir(const char *name, PRIntn mode) +{ + return PR_MakeDir(name, mode); } /* PR_Mkdir */ PR_IMPLEMENT(PRStatus) PR_RmDir(const char *name) @@ -3694,10 +3165,6 @@ PR_IMPLEMENT(PRFileDesc*) PR_NewUDPSocket() { PRIntn domain = PF_INET; -#if defined(_PR_INET6) - if (_pr_ipv6_enabled) - domain = PF_INET6; -#endif return PR_Socket(domain, SOCK_DGRAM, 0); } /* PR_NewUDPSocket */ @@ -3705,10 +3172,6 @@ PR_IMPLEMENT(PRFileDesc*) PR_NewTCPSocket() { PRIntn domain = PF_INET; -#if defined(_PR_INET6) - if (_pr_ipv6_enabled) - domain = PF_INET6; -#endif return PR_Socket(domain, SOCK_STREAM, 0); } /* PR_NewTCPSocket */ @@ -3733,13 +3196,13 @@ PR_IMPLEMENT(PRStatus) PR_NewTCPSocketPair(PRFileDesc *fds[2]) return PR_FAILURE; } - fds[0] = pt_SetMethods(osfd[0], PR_DESC_SOCKET_TCP); + fds[0] = pt_SetMethods(osfd[0], PR_DESC_SOCKET_TCP, PR_FALSE); if (fds[0] == NULL) { close(osfd[0]); close(osfd[1]); return PR_FAILURE; } - fds[1] = pt_SetMethods(osfd[1], PR_DESC_SOCKET_TCP); + fds[1] = pt_SetMethods(osfd[1], PR_DESC_SOCKET_TCP, PR_FALSE); if (fds[1] == NULL) { PR_Close(fds[0]); close(osfd[1]); @@ -3754,7 +3217,6 @@ PR_IMPLEMENT(PRStatus) PR_CreatePipe( ) { int pipefd[2]; - int flags; if (pt_TestAbort()) return PR_FAILURE; @@ -3764,26 +3226,20 @@ PR_IMPLEMENT(PRStatus) PR_CreatePipe( PR_SetError(PR_UNKNOWN_ERROR, errno); return PR_FAILURE; } - *readPipe = pt_SetMethods(pipefd[0], PR_DESC_PIPE); + *readPipe = pt_SetMethods(pipefd[0], PR_DESC_PIPE, PR_FALSE); if (NULL == *readPipe) { close(pipefd[0]); close(pipefd[1]); return PR_FAILURE; } - flags = fcntl(pipefd[0], F_GETFL, 0); - flags |= _PR_FCNTL_FLAGS; - (void)fcntl(pipefd[0], F_SETFL, flags); - *writePipe = pt_SetMethods(pipefd[1], PR_DESC_PIPE); + *writePipe = pt_SetMethods(pipefd[1], PR_DESC_PIPE, PR_FALSE); if (NULL == *writePipe) { PR_Close(*readPipe); close(pipefd[1]); return PR_FAILURE; } - flags = fcntl(pipefd[1], F_GETFL, 0); - flags |= _PR_FCNTL_FLAGS; - (void)fcntl(pipefd[1], F_SETFL, flags); return PR_SUCCESS; } @@ -3824,7 +3280,7 @@ PR_IMPLEMENT(PRFileDesc*) PR_ImportFile(PRInt32 osfd) PRFileDesc *fd; if (!_pr_initialized) _PR_ImplicitInitialization(); - fd = pt_SetMethods(osfd, PR_DESC_FILE); + fd = pt_SetMethods(osfd, PR_DESC_FILE, PR_FALSE); if (NULL == fd) close(osfd); return fd; } /* PR_ImportFile */ @@ -3834,7 +3290,7 @@ PR_IMPLEMENT(PRFileDesc*) PR_ImportTCPSocket(PRInt32 osfd) PRFileDesc *fd; if (!_pr_initialized) _PR_ImplicitInitialization(); - fd = pt_SetMethods(osfd, PR_DESC_SOCKET_TCP); + fd = pt_SetMethods(osfd, PR_DESC_SOCKET_TCP, PR_FALSE); if (NULL == fd) close(osfd); return fd; } /* PR_ImportTCPSocket */ @@ -3844,7 +3300,7 @@ PR_IMPLEMENT(PRFileDesc*) PR_ImportUDPSocket(PRInt32 osfd) PRFileDesc *fd; if (!_pr_initialized) _PR_ImplicitInitialization(); - fd = pt_SetMethods(osfd, PR_DESC_SOCKET_UDP); + fd = pt_SetMethods(osfd, PR_DESC_SOCKET_UDP, PR_FALSE); if (NULL != fd) close(osfd); return fd; } /* PR_ImportUDPSocket */ |