diff options
Diffstat (limited to 'libgo/go/net/fd_windows.go')
-rw-r--r-- | libgo/go/net/fd_windows.go | 202 |
1 files changed, 105 insertions, 97 deletions
diff --git a/libgo/go/net/fd_windows.go b/libgo/go/net/fd_windows.go index fd50d772d6..a976f2ac7f 100644 --- a/libgo/go/net/fd_windows.go +++ b/libgo/go/net/fd_windows.go @@ -5,12 +5,12 @@ package net import ( + "context" "internal/race" "os" "runtime" "sync" "syscall" - "time" "unsafe" ) @@ -42,11 +42,6 @@ func sysInit() { initErr = os.NewSyscallError("wsastartup", e) } canCancelIO = syscall.LoadCancelIoEx() == nil - if syscall.LoadGetAddrInfo() == nil { - lookupPort = newLookupPort - lookupIP = newLookupIP - } - hasLoadSetFileCompletionNotificationModes = syscall.LoadSetFileCompletionNotificationModes() == nil if hasLoadSetFileCompletionNotificationModes { // It's not safe to use FILE_SKIP_COMPLETION_PORT_ON_SUCCESS if non IFS providers are installed: @@ -69,22 +64,15 @@ func sysInit() { } } +// canUseConnectEx reports whether we can use the ConnectEx Windows API call +// for the given network type. func canUseConnectEx(net string) bool { switch net { - case "udp", "udp4", "udp6", "ip", "ip4", "ip6": - // ConnectEx windows API does not support connectionless sockets. - return false - } - return syscall.LoadConnectEx() == nil -} - -func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) { - if !canUseConnectEx(net) { - // Use the relatively inefficient goroutine-racing - // implementation of DialTimeout. - return dialChannel(net, ra, dialer, deadline) + case "tcp", "tcp4", "tcp6": + return true } - return dialer(deadline) + // ConnectEx windows API does not support connectionless sockets. + return false } // operation contains superset of data necessary to perform all async IO. @@ -108,6 +96,7 @@ type operation struct { rsan int32 handle syscall.Handle flags uint32 + bufs []syscall.WSABuf } func (o *operation) InitBuf(buf []byte) { @@ -118,6 +107,30 @@ func (o *operation) InitBuf(buf []byte) { } } +func (o *operation) InitBufs(buf *Buffers) { + if o.bufs == nil { + o.bufs = make([]syscall.WSABuf, 0, len(*buf)) + } else { + o.bufs = o.bufs[:0] + } + for _, b := range *buf { + var p *byte + if len(b) > 0 { + p = &b[0] + } + o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: p}) + } +} + +// ClearBufs clears all pointers to Buffers parameter captured +// by InitBufs, so it can be released by garbage collector. +func (o *operation) ClearBufs() { + for i := range o.bufs { + o.bufs[i].Buf = nil + } + o.bufs = o.bufs[:0] +} + // ioSrv executes net IO requests. type ioSrv struct { req chan ioSrvReq @@ -152,7 +165,7 @@ func (s *ioSrv) ProcessRemoteIO() { func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) { fd := o.fd // Notify runtime netpoll about starting IO. - err := fd.pd.Prepare(int(o.mode)) + err := fd.pd.prepare(int(o.mode)) if err != nil { return 0, err } @@ -180,7 +193,7 @@ func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) erro return 0, err } // Wait for our request to complete. - err = fd.pd.Wait(int(o.mode)) + err = fd.pd.wait(int(o.mode)) if err == nil { // All is good. Extract our IO results and return. if o.errno != 0 { @@ -210,7 +223,7 @@ func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) erro <-o.errc } // Wait for cancelation to complete. - fd.pd.WaitCanceled(int(o.mode)) + fd.pd.waitCanceled(int(o.mode)) if o.errno != 0 { err = syscall.Errno(o.errno) if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled @@ -251,6 +264,7 @@ type netFD struct { sysfd syscall.Handle family int sotype int + isStream bool isConnected bool skipSyncNotif bool net string @@ -269,11 +283,11 @@ func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) return nil, initErr } onceStartServer.Do(startServer) - return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net}, nil + return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil } func (fd *netFD) init() error { - if err := fd.pd.Init(fd); err != nil { + if err := fd.pd.init(fd); err != nil { return err } if hasLoadSetFileCompletionNotificationModes { @@ -320,19 +334,20 @@ func (fd *netFD) setAddr(laddr, raddr Addr) { runtime.SetFinalizer(fd, (*netFD).Close) } -func (fd *netFD) connect(la, ra syscall.Sockaddr, deadline time.Time, cancel <-chan struct{}) error { +func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { // Do not need to call fd.writeLock here, // because fd is not yet accessible to user, // so no concurrent operations are possible. if err := fd.init(); err != nil { return err } - if !deadline.IsZero() { + if deadline, ok := ctx.Deadline(); ok && !deadline.IsZero() { fd.setWriteDeadline(deadline) defer fd.setWriteDeadline(noDeadline) } if !canUseConnectEx(fd.net) { - return os.NewSyscallError("connect", connectFunc(fd.sysfd, ra)) + err := connectFunc(fd.sysfd, ra) + return os.NewSyscallError("connect", err) } // ConnectEx windows API requires an unconnected, previously bound socket. if la == nil { @@ -351,26 +366,30 @@ func (fd *netFD) connect(la, ra syscall.Sockaddr, deadline time.Time, cancel <-c // Call ConnectEx API. o := &fd.wop o.sa = ra - if cancel != nil { - done := make(chan struct{}) - defer close(done) - go func() { - select { - case <-cancel: - // Force the runtime's poller to immediately give - // up waiting for writability. - fd.setWriteDeadline(aLongTimeAgo) - case <-done: - } - }() - } + + // Wait for the goroutine converting context.Done into a write timeout + // to exist, otherwise our caller might cancel the context and + // cause fd.setWriteDeadline(aLongTimeAgo) to cancel a successful dial. + done := make(chan bool) // must be unbuffered + defer func() { done <- true }() + go func() { + select { + case <-ctx.Done(): + // Force the runtime's poller to immediately give + // up waiting for writability. + fd.setWriteDeadline(aLongTimeAgo) + <-done + case <-done: + } + }() + _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error { return connectExFunc(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) }) if err != nil { select { - case <-cancel: - return errCanceled + case <-ctx.Done(): + return mapErr(ctx.Err()) default: if _, ok := err.(syscall.Errno); ok { err = os.NewSyscallError("connectex", err) @@ -388,68 +407,19 @@ func (fd *netFD) destroy() { } // Poller may want to unregister fd in readiness notification mechanism, // so this must be executed before closeFunc. - fd.pd.Close() + fd.pd.close() closeFunc(fd.sysfd) fd.sysfd = syscall.InvalidHandle // no need for a finalizer anymore runtime.SetFinalizer(fd, nil) } -// Add a reference to this fd. -// Returns an error if the fd cannot be used. -func (fd *netFD) incref() error { - if !fd.fdmu.Incref() { - return errClosing - } - return nil -} - -// Remove a reference to this FD and close if we've been asked to do so -// (and there are no references left). -func (fd *netFD) decref() { - if fd.fdmu.Decref() { - fd.destroy() - } -} - -// Add a reference to this fd and lock for reading. -// Returns an error if the fd cannot be used. -func (fd *netFD) readLock() error { - if !fd.fdmu.RWLock(true) { - return errClosing - } - return nil -} - -// Unlock for reading and remove a reference to this FD. -func (fd *netFD) readUnlock() { - if fd.fdmu.RWUnlock(true) { - fd.destroy() - } -} - -// Add a reference to this fd and lock for writing. -// Returns an error if the fd cannot be used. -func (fd *netFD) writeLock() error { - if !fd.fdmu.RWLock(false) { - return errClosing - } - return nil -} - -// Unlock for writing and remove a reference to this FD. -func (fd *netFD) writeUnlock() { - if fd.fdmu.RWUnlock(false) { - fd.destroy() - } -} - func (fd *netFD) Close() error { - if !fd.fdmu.IncrefAndClose() { + if !fd.fdmu.increfAndClose() { return errClosing } // unblock pending reader and writer - fd.pd.Evict() + fd.pd.evict() fd.decref() return nil } @@ -483,7 +453,9 @@ func (fd *netFD) Read(buf []byte) (int, error) { if race.Enabled { race.Acquire(unsafe.Pointer(&ioSync)) } - err = fd.eofError(n, err) + if len(buf) != 0 { + err = fd.eofError(n, err) + } if _, ok := err.(syscall.Errno); ok { err = os.NewSyscallError("wsarecv", err) } @@ -537,6 +509,42 @@ func (fd *netFD) Write(buf []byte) (int, error) { return n, err } +func (c *conn) writeBuffers(v *Buffers) (int64, error) { + if !c.ok() { + return 0, syscall.EINVAL + } + n, err := c.fd.writeBuffers(v) + if err != nil { + return n, &OpError{Op: "WSASend", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} + } + return n, nil +} + +func (fd *netFD) writeBuffers(buf *Buffers) (int64, error) { + if len(*buf) == 0 { + return 0, nil + } + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if race.Enabled { + race.ReleaseMerge(unsafe.Pointer(&ioSync)) + } + o := &fd.wop + o.InitBufs(buf) + n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { + return syscall.WSASend(o.fd.sysfd, &o.bufs[0], uint32(len(*buf)), &o.qty, 0, &o.o, nil) + }) + o.ClearBufs() + if _, ok := err.(syscall.Errno); ok { + err = os.NewSyscallError("wsasend", err) + } + testHookDidWritev(n) + buf.consume(int64(n)) + return int64(n), err +} + func (fd *netFD) writeTo(buf []byte, sa syscall.Sockaddr) (int, error) { if len(buf) == 0 { return 0, nil @@ -579,7 +587,7 @@ func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD o.handle = s o.rsan = int32(unsafe.Sizeof(rawsa[0])) _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error { - return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) + return acceptFunc(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) }) if err != nil { netfd.Close() @@ -595,7 +603,7 @@ func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD netfd.Close() return nil, os.NewSyscallError("setsockopt", err) } - + runtime.KeepAlive(fd) return netfd, nil } |