diff options
author | sfraser%netscape.com <devnull@localhost> | 2001-10-09 02:12:40 +0000 |
---|---|---|
committer | sfraser%netscape.com <devnull@localhost> | 2001-10-09 02:12:40 +0000 |
commit | 64ffbb4adafc1f7a3cc8cc59cf5f83aee4b5c075 (patch) | |
tree | 1f472b9bcf439920936426f27ed76b261d869c22 | |
parent | d4a550b245365a002c3ead490808fdedaa7b4648 (diff) | |
download | nspr-hg-64ffbb4adafc1f7a3cc8cc59cf5f83aee4b5c075.tar.gz |
Fix for 100353 -- handle polling for read and write on different threads, and properly deal with sending to a socket that has flow control restrictions. Fixes a problem saving to IMAP server sent messages with attachments. r=gordon, wtc.
-rw-r--r-- | pr/include/md/_macos.h | 3 | ||||
-rw-r--r-- | pr/src/md/mac/macsockotpt.c | 301 |
2 files changed, 173 insertions, 131 deletions
diff --git a/pr/include/md/_macos.h b/pr/include/md/_macos.h index 565cb94a..caf73cb9 100644 --- a/pr/include/md/_macos.h +++ b/pr/include/md/_macos.h @@ -120,11 +120,10 @@ struct _MDFileDesc { /* Server sockets: listen bit tells the notifier func what to do */ PRBool doListen; - + _MDSocketCallerInfo misc; _MDSocketCallerInfo read; _MDSocketCallerInfo write; - _MDSocketCallerInfo poll; }; /* diff --git a/pr/src/md/mac/macsockotpt.c b/pr/src/md/mac/macsockotpt.c index 9eccf69c..ef15a56b 100644 --- a/pr/src/md/mac/macsockotpt.c +++ b/pr/src/md/mac/macsockotpt.c @@ -305,92 +305,89 @@ WakeUpNotifiedThread(PRThread *thread, OTResult result) // Notification routine // Async callback routine. // A5 is OK. Cannot allocate memory here +// Ref: http://gemma.apple.com/techpubs/mac/NetworkingOT/NetworkingWOT-100.html +// static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResult result, void * cookie) { - PRFilePrivate *secret = (PRFilePrivate *) contextPtr; - _MDFileDesc * md = &(secret->md); - EndpointRef endpoint = (EndpointRef)secret->md.osfd; - PRThread * thread = NULL; - PRThread * pollThread = md->poll.thread; - OSStatus err; - OTResult resultOT; - TDiscon discon; + PRFilePrivate *secret = (PRFilePrivate *) contextPtr; + _MDFileDesc * md = &(secret->md); + EndpointRef endpoint = (EndpointRef)secret->md.osfd; + PRThread * readThread = NULL; // also used for 'misc' + PRThread * writeThread = NULL; + OSStatus err; + OTResult resultOT; + TDiscon discon; switch (code) { // OTLook Events - case T_LISTEN: // A connection request is available - // If md->doListen is true, then PR_Listen has been - // called on this endpoint; therefore, we're ready to - // accept connections. But we'll do that with PR_Accept - // (which calls OTListen, OTAccept, etc) instead of - // doing it here. - if (md->doListen) { - thread = secret->md.misc.thread; - secret->md.misc.thread = NULL; - secret->md.misc.cookie = cookie; - break; - } else { - // Reject the connection, we're not listening - OTSndDisconnect(endpoint, NULL); - } + // If md->doListen is true, then PR_Listen has been + // called on this endpoint; therefore, we're ready to + // accept connections. But we'll do that with PR_Accept + // (which calls OTListen, OTAccept, etc) instead of + // doing it here. + if (md->doListen) { + readThread = secret->md.misc.thread; + secret->md.misc.thread = NULL; + secret->md.misc.cookie = cookie; + break; + } else { + // Reject the connection, we're not listening + OTSndDisconnect(endpoint, NULL); + } break; - + case T_CONNECT: // Confirmation of a connect request - // cookie = sndCall parameter from OTConnect() + // cookie = sndCall parameter from OTConnect() err = OTRcvConnect(endpoint, NULL); PR_ASSERT(err == kOTNoError); - // wake up waiting thread, if any - thread = secret->md.write.thread; + // wake up waiting thread, if any. + writeThread = secret->md.write.thread; secret->md.write.thread = NULL; - secret->md.write.cookie = cookie; + secret->md.write.cookie = cookie; break; case T_DATA: // Standard data is available - // Mark this socket as readable. - secret->md.readReady = PR_TRUE; + // Mark this socket as readable. + secret->md.readReady = PR_TRUE; - // wake up waiting thread, if any - thread = secret->md.read.thread; + // wake up waiting thread, if any + readThread = secret->md.read.thread; secret->md.read.thread = NULL; secret->md.read.cookie = cookie; - break; + break; case T_EXDATA: // Expedited data is available PR_ASSERT(!"T_EXDATA Not implemented"); - return; + return; case T_DISCONNECT: // A disconnect is available discon.udata.len = 0; err = OTRcvDisconnect(endpoint, &discon); PR_ASSERT(err == kOTNoError); - secret->md.exceptReady = PR_TRUE; + secret->md.exceptReady = PR_TRUE; // XXX Check this - // wake up waiting threads, if any - result = -3199 - discon.reason; // obtain the negative error code + // wake up waiting threads, if any + result = -3199 - discon.reason; // obtain the negative error code + if ((readThread = secret->md.read.thread) != NULL) { + secret->md.read.thread = NULL; + secret->md.read.cookie = cookie; + } - if ((thread = secret->md.read.thread) != NULL) { - secret->md.read.thread = NULL; - secret->md.read.cookie = cookie; - WakeUpNotifiedThread(thread, result); - } - - if ((thread = secret->md.write.thread) != NULL) { - secret->md.write.thread = NULL; - secret->md.write.cookie = cookie; - WakeUpNotifiedThread(thread, result); - } - - thread = NULL; // already took care of notification here + if ((writeThread = secret->md.write.thread) != NULL) { + secret->md.write.thread = NULL; + secret->md.write.cookie = cookie; + } break; - + case T_ERROR: // obsolete/unused in library PR_ASSERT(!"T_ERROR Not implemented"); - return; - + return; + case T_UDERR: // UDP Send error; clear the error - (void) OTRcvUDErr((EndpointRef) cookie, NULL); + (void) OTRcvUDErr((EndpointRef) cookie, NULL); break; case T_ORDREL: // An orderly release is available @@ -398,28 +395,27 @@ static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResul PR_ASSERT(err == kOTNoError); secret->md.readReady = PR_TRUE; // mark readable (to emulate bsd sockets) // remember connection is closed, so we can return 0 on read or receive - secret->md.orderlyDisconnect = PR_TRUE; - - thread = secret->md.read.thread; - secret->md.read.thread = NULL; - secret->md.read.cookie = cookie; - + secret->md.orderlyDisconnect = PR_TRUE; + + readThread = secret->md.read.thread; + secret->md.read.thread = NULL; + secret->md.read.cookie = cookie; break; case T_GODATA: // Flow control lifted on standard data secret->md.writeReady = PR_TRUE; - resultOT = OTLook(endpoint); // clear T_GODATA event - PR_ASSERT(resultOT == T_GODATA); - - // wake up waiting thread, if any - thread = secret->md.write.thread; + resultOT = OTLook(endpoint); // clear T_GODATA event + PR_ASSERT(resultOT == T_GODATA); + + // wake up waiting thread, if any + writeThread = secret->md.write.thread; secret->md.write.thread = NULL; secret->md.write.cookie = cookie; break; case T_GOEXDATA: // Flow control lifted on expedited data PR_ASSERT(!"T_GOEXDATA Not implemented"); - return; + return; case T_REQUEST: // An Incoming request is available PR_ASSERT(!"T_REQUEST Not implemented"); @@ -430,13 +426,13 @@ static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResul return; case T_PASSCON: // State is now T_DATAXFER - // OTAccept() complete, receiving endpoint in T_DATAXFER state - // cookie = OTAccept() resRef parameter - break; + // OTAccept() complete, receiving endpoint in T_DATAXFER state + // cookie = OTAccept() resRef parameter + break; case T_RESET: // Protocol has been reset PR_ASSERT(!"T_RESET Not implemented"); - return; + return; // Async Completion Events case T_BINDCOMPLETE: @@ -444,39 +440,39 @@ static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResul case T_ACCEPTCOMPLETE: case T_OPTMGMTCOMPLETE: case T_GETPROTADDRCOMPLETE: - thread = secret->md.misc.thread; + readThread = secret->md.misc.thread; secret->md.misc.thread = NULL; secret->md.misc.cookie = cookie; break; -// case T_OPENCOMPLETE: // we open endpoints in synchronous mode +// case T_OPENCOMPLETE: // we open endpoints in synchronous mode // case T_REPLYCOMPLETE: -// case T_DISCONNECTCOMPLETE: // we don't call OTSndDisconnect() +// case T_DISCONNECTCOMPLETE: // we don't call OTSndDisconnect() // case T_RESOLVEADDRCOMPLETE: // case T_GETINFOCOMPLETE: // case T_SYNCCOMPLETE: -// case T_MEMORYRELEASED: // only if OTAckSends() called on endpoint +// case T_MEMORYRELEASED: // only if OTAckSends() called on endpoint // case T_REGNAMECOMPLETE: // case T_DELNAMECOMPLETE: // case T_LKUPNAMECOMPLETE: // case T_LKUPNAMERESULT: - // OpenTptInternet.h -// case T_DNRSTRINGTOADDRCOMPLETE: // DNS is handled by dnsContext in DNSNotifierRoutine() + // OpenTptInternet.h +// case T_DNRSTRINGTOADDRCOMPLETE: // DNS is handled by dnsContext in DNSNotifierRoutine() // case T_DNRADDRTONAMECOMPLETE: // case T_DNRSYSINFOCOMPLETE: // case T_DNRMAILEXCHANGECOMPLETE: // case T_DNRQUERYCOMPLETE: default: - // we should probably have a bit more sophisticated handling of kOTSystemSleep, etc. - // PR_ASSERT(code != 0); + // we should probably have a bit more sophisticated handling of kOTSystemSleep, etc. + // PR_ASSERT(code != 0); return; } - if (pollThread) - WakeUpNotifiedThread(pollThread, kOTNoError); + if (readThread) + WakeUpNotifiedThread(readThread, result); - if (thread && (thread != pollThread)) - WakeUpNotifiedThread(thread, result); + if (writeThread && (writeThread != readThread)) + WakeUpNotifiedThread(writeThread, result); } @@ -488,8 +484,8 @@ static OSErr CreateSocket(int type, EndpointRef *endpoint) OTConfiguration *config; EndpointRef ep; - // for now we just create the endpoint - // we'll make it asynchronous and give it a notifier routine in _MD_makenonblock() + // for now we just create the endpoint + // we'll make it asynchronous and give it a notifier routine in _MD_makenonblock() switch (type){ case SOCK_STREAM: configName = kTCPName; break; @@ -519,7 +515,7 @@ PRInt32 _MD_socket(int domain, int type, int protocol) OSStatus err; EndpointRef endpoint; - _MD_FinishInitNetAccess(); + _MD_FinishInitNetAccess(); // We only deal with internet domain if (domain != AF_INET) { @@ -1349,7 +1345,8 @@ PRInt32 _MD_connect(PRFileDesc *fd, PRNetAddr *addr, PRUint32 addrlen, PRInterva sndCall.addr.buf = (UInt8*) addr; if (!fd->secret->nonblocking) { - PrepareForAsyncCompletion(me, fd->secret->md.osfd); + PrepareForAsyncCompletion(me, fd->secret->md.osfd); + PR_ASSERT(fd->secret->md.write.thread == NULL); fd->secret->md.write.thread = me; } @@ -1407,7 +1404,10 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount, err = kEFAULTErr; goto ErrorExit; } - + + PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == NULL : + fd->secret->md.read.thread == NULL); + while (bytesLeft > 0) { Boolean disabledNotifications = OTEnterNotifier(endpoint); @@ -1416,7 +1416,6 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount, if (opCode == kSTREAM_SEND) { do { - fd->secret->md.write.thread = me; fd->secret->md.writeReady = PR_FALSE; // expect the worst result = OTSnd(endpoint, buf, bytesLeft, NULL); @@ -1500,8 +1499,10 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount, if (result > 0) { buf = (void *) ( (UInt32) buf + (UInt32)result ); bytesLeft -= result; - if (opCode == kSTREAM_RECEIVE) - return result; + if (opCode == kSTREAM_RECEIVE) { + amount = result; + goto NormalExit; + } } else { switch (result) { case kOTLookErr: @@ -1513,8 +1514,15 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount, case kEAGAINErr: case kEWOULDBLOCKErr: if (fd->secret->nonblocking) { - err = result; - goto ErrorExit; + + if (bytesLeft == amount) { // no data was sent + err = result; + goto ErrorExit; + } + + // some data was sent + amount -= bytesLeft; + goto NormalExit; } WaitOnThisThread(me, timeout); @@ -1524,8 +1532,11 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount, break; case kOTOutStateErr: // if provider already closed, fall through to handle error - if (fd->secret->md.orderlyDisconnect) - return 0; + if (fd->secret->md.orderlyDisconnect) { + amount = 0; + goto NormalExit; + } + // else fall through default: err = result; goto ErrorExit; @@ -1533,30 +1544,31 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount, } } - PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == nil : - fd->secret->md.read.thread == nil); +NormalExit: + PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == NULL : + fd->secret->md.read.thread == NULL); return amount; ErrorExit: - PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == nil : - fd->secret->md.read.thread == nil); + PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == NULL : + fd->secret->md.read.thread == NULL); macsock_map_error(err); return -1; -} +} PRInt32 _MD_recv(PRFileDesc *fd, void *buf, PRInt32 amount, PRIntn flags, PRIntervalTime timeout) { return (SendReceiveStream(fd, buf, amount, flags, timeout, kSTREAM_RECEIVE)); -} +} PRInt32 _MD_send(PRFileDesc *fd,const void *buf, PRInt32 amount, PRIntn flags, PRIntervalTime timeout) { return (SendReceiveStream(fd, (void *)buf, amount, flags, timeout, kSTREAM_SEND)); -} +} // Errors: @@ -1638,7 +1650,7 @@ static PRInt32 SendReceiveDgram(PRFileDesc *fd, void *buf, PRInt32 amount, ErrorExit: macsock_map_error(err); return -1; -} +} PRInt32 _MD_recvfrom(PRFileDesc *fd, void *buf, PRInt32 amount, @@ -1647,7 +1659,7 @@ PRInt32 _MD_recvfrom(PRFileDesc *fd, void *buf, PRInt32 amount, { return (SendReceiveDgram(fd, buf, amount, flags, addr, addrlen, timeout, kDGRAM_RECEIVE)); -} +} PRInt32 _MD_sendto(PRFileDesc *fd,const void *buf, PRInt32 amount, @@ -1656,7 +1668,7 @@ PRInt32 _MD_sendto(PRFileDesc *fd,const void *buf, PRInt32 amount, { return (SendReceiveDgram(fd, (void *)buf, amount, flags, addr, &addrlen, timeout, kDGRAM_SEND)); -} +} PRInt32 _MD_closesocket(PRInt32 osfd) @@ -1683,7 +1695,7 @@ PRInt32 _MD_closesocket(PRInt32 osfd) ErrorExit: macsock_map_error(err); return -1; -} +} PRInt32 _MD_writev(PRFileDesc *fd, const struct PRIOVec *iov, PRInt32 iov_size, PRIntervalTime timeout) @@ -1693,9 +1705,11 @@ PRInt32 _MD_writev(PRFileDesc *fd, const struct PRIOVec *iov, PRInt32 iov_size, PR_ASSERT(0); _PR_MD_CURRENT_THREAD()->md.osErrCode = unimpErr; return -1; -} - +} +// OT endpoint states are documented here: +// http://gemma.apple.com/techpubs/mac/NetworkingOT/NetworkingWOT-27.html#MARKER-9-65 +// static PRBool GetState(PRFileDesc *fd, PRBool *readReady, PRBool *writeReady, PRBool *exceptReady) { OTResult resultOT; @@ -1706,14 +1720,32 @@ static PRBool GetState(PRFileDesc *fd, PRBool *readReady, PRBool *writeReady, PR OTCountDataBytes((EndpointRef)fd->secret->md.osfd, &availableData); *readReady = fd->secret->md.readReady && (availableData > 0); - *exceptReady = fd->secret->md.exceptReady; + *exceptReady = fd->secret->md.exceptReady; resultOT = OTGetEndpointState((EndpointRef)fd->secret->md.osfd); - switch (resultOT) { - case T_DATAXFER: - case T_INREL: - *writeReady = PR_TRUE; + switch (resultOT) { + case T_IDLE: + case T_UNBND: + // the socket is not connected. Emulating BSD sockets, + // we mark it readable and writable. The next PR_Read + // or PR_Write will then fail. Usually, in this situation, + // fd->secret->md.exceptReady is also set, and returned if + // anyone is polling for it. + *readReady = PR_FALSE; + *writeReady = PR_FALSE; + break; + + case T_DATAXFER: // data transfer + *writeReady = fd->secret->md.writeReady; break; + + case T_INREL: // incoming orderly release + *writeReady = fd->secret->md.writeReady; + break; + + case T_OUTCON: // outgoing connection pending + case T_INCON: // incoming connection pending + case T_OUTREL: // outgoing orderly release default: *writeReady = PR_FALSE; } @@ -1811,7 +1843,24 @@ static void SetDescPollThread(PRPollDesc *pds, PRIntn npds, PRThread* thread) PRFileDesc *bottomFD = PR_GetIdentitiesLayer(pd->fd, PR_NSPR_IO_LAYER); if (bottomFD && (_PR_FILEDESC_OPEN == bottomFD->secret->state)) { - bottomFD->secret->md.poll.thread = thread; + if (pd->in_flags & PR_POLL_READ) { + PR_ASSERT(thread == NULL || bottomFD->secret->md.read.thread == NULL); + bottomFD->secret->md.read.thread = thread; + } + + if (pd->in_flags & PR_POLL_WRITE) { + // it's possible for the writing thread to be non-null during + // a non-blocking connect, so we assert that we're on + // the same thread, or the thread is null. + // Note that it's strictly possible for the connect and poll + // to be on different threads, so ideally we need to assert + // that if md.write.thread is non-null, there is a non-blocking + // connect in progress. + PR_ASSERT(thread == NULL || + (bottomFD->secret->md.write.thread == NULL || + bottomFD->secret->md.write.thread == thread)); + bottomFD->secret->md.write.thread = thread; + } } } } @@ -1822,9 +1871,8 @@ PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout) PRThread *thread = _PR_MD_CURRENT_THREAD(); intn is; PRInt32 ready; - OSErr result; - if (timeout == PR_INTERVAL_NO_WAIT) { + if (timeout == PR_INTERVAL_NO_WAIT) { return CheckPollDescs(pds, npds); } @@ -1835,7 +1883,7 @@ PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout) // need to set up the thread PrepareForAsyncCompletion(thread, 0); - SetDescPollThread(pds, npds, thread); + SetDescPollThread(pds, npds, thread); ready = CheckPollDescs(pds, npds); PR_Unlock(thread->md.asyncIOLock); @@ -1843,13 +1891,8 @@ PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout) if (ready == 0) { WaitOnThisThread(thread, timeout); - result = thread->md.osErrCode; - if (result != noErr && result != kETIMEDOUTErr) { - PR_ASSERT(0); /* debug: catch unexpected errors */ - ready = -1; - } else { - ready = CheckPollDescs(pds, npds); - } + ready = CheckPollDescs(pds, npds); + } else { thread->io_pending = PR_FALSE; } @@ -1937,7 +1980,7 @@ PR_IMPLEMENT(PRInt32) _MD_shutdown(PRFileDesc *fd, PRIntn how) /* Just succeed silently!!! */ return (0); -} +} PR_IMPLEMENT(PRStatus) @@ -1987,7 +2030,7 @@ _MD_getpeername(PRFileDesc *fd, PRNetAddr *addr, PRUint32 *addrlen) ErrorExit: macsock_map_error(err); return PR_FAILURE; -} +} PR_IMPLEMENT(unsigned long) inet_addr(const char *cp) @@ -1995,7 +2038,7 @@ PR_IMPLEMENT(unsigned long) inet_addr(const char *cp) OSStatus err; InetHost host; - _MD_FinishInitNetAccess(); + _MD_FinishInitNetAccess(); err = OTInetStringToHost((char*) cp, &host); if (err != kOTNoError) @@ -2067,7 +2110,7 @@ PR_IMPLEMENT(struct hostent *) gethostbyaddr(const void *addr, int addrlen, int PR_IMPLEMENT(char *) inet_ntoa(struct in_addr addr) { - _MD_FinishInitNetAccess(); + _MD_FinishInitNetAccess(); OTInetHostToString((InetHost)addr.s_addr, sHostInfo.name); @@ -2080,7 +2123,7 @@ PRStatus _MD_gethostname(char *name, int namelen) OSStatus err; InetInterfaceInfo info; - _MD_FinishInitNetAccess(); + _MD_FinishInitNetAccess(); /* * On a Macintosh, we don't have the concept of a local host name. @@ -2164,8 +2207,8 @@ int _MD_mac_get_nonblocking_connect_error(PRInt32 osfd) case T_IDLE: return -1; case T_INREL: - macsock_map_error(ENOTCONN); - return -1; + macsock_map_error(ENOTCONN); + return -1; default: PR_ASSERT(0); return -1; |