diff options
Diffstat (limited to 'libgo/go/net/fd_plan9.go')
-rw-r--r-- | libgo/go/net/fd_plan9.go | 220 |
1 files changed, 144 insertions, 76 deletions
diff --git a/libgo/go/net/fd_plan9.go b/libgo/go/net/fd_plan9.go index cec88609d0..300d8c4543 100644 --- a/libgo/go/net/fd_plan9.go +++ b/libgo/go/net/fd_plan9.go @@ -1,4 +1,4 @@ -// Copyright 2009 The Go Authors. All rights reserved. +// Copyright 2009 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -7,21 +7,37 @@ package net import ( "io" "os" + "sync/atomic" "syscall" "time" ) +type atomicBool int32 + +func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } +func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) } +func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) } + // Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close - net string - n string - dir string - ctl, data *os.File - laddr, raddr Addr + net string + n string + dir string + listen, ctl, data *os.File + laddr, raddr Addr + isStream bool + + // deadlines + raio *asyncIO + waio *asyncIO + rtimer *time.Timer + wtimer *time.Timer + rtimedout atomicBool // set true when read deadline has been reached + wtimedout atomicBool // set true when write deadline has been reached } var ( @@ -32,14 +48,16 @@ func sysInit() { netdir = "/net" } -func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) { - // On plan9, use the relatively inefficient - // goroutine-racing implementation. - return dialChannel(net, ra, dialer, deadline) -} - -func newFD(net, name string, ctl, data *os.File, laddr, raddr Addr) (*netFD, error) { - return &netFD{net: net, n: name, dir: netdir + "/" + net + "/" + name, ctl: ctl, data: data, laddr: laddr, raddr: raddr}, nil +func newFD(net, name string, listen, ctl, data *os.File, laddr, raddr Addr) (*netFD, error) { + return &netFD{ + net: net, + n: name, + dir: netdir + "/" + net + "/" + name, + listen: listen, + ctl: ctl, data: data, + laddr: laddr, + raddr: raddr, + }, nil } func (fd *netFD) init() error { @@ -70,60 +88,20 @@ func (fd *netFD) destroy() { err = err1 } } + if fd.listen != nil { + if err1 := fd.listen.Close(); err1 != nil && err == nil { + err = err1 + } + } fd.ctl = nil fd.data = nil -} - -// Add a reference to this fd. -// Returns an error if the fd cannot be used. -func (fd *netFD) incref() error { - if !fd.fdmu.Incref() { - return errClosing - } - return nil -} - -// Remove a reference to this FD and close if we've been asked to do so -// (and there are no references left). -func (fd *netFD) decref() { - if fd.fdmu.Decref() { - fd.destroy() - } -} - -// Add a reference to this fd and lock for reading. -// Returns an error if the fd cannot be used. -func (fd *netFD) readLock() error { - if !fd.fdmu.RWLock(true) { - return errClosing - } - return nil -} - -// Unlock for reading and remove a reference to this FD. -func (fd *netFD) readUnlock() { - if fd.fdmu.RWUnlock(true) { - fd.destroy() - } -} - -// Add a reference to this fd and lock for writing. -// Returns an error if the fd cannot be used. -func (fd *netFD) writeLock() error { - if !fd.fdmu.RWLock(false) { - return errClosing - } - return nil -} - -// Unlock for writing and remove a reference to this FD. -func (fd *netFD) writeUnlock() { - if fd.fdmu.RWUnlock(false) { - fd.destroy() - } + fd.listen = nil } func (fd *netFD) Read(b []byte) (n int, err error) { + if fd.rtimedout.isSet() { + return 0, errTimeout + } if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } @@ -131,7 +109,18 @@ func (fd *netFD) Read(b []byte) (n int, err error) { return 0, err } defer fd.readUnlock() - n, err = fd.data.Read(b) + if len(b) == 0 { + return 0, nil + } + fd.raio = newAsyncIO(fd.data.Read, b) + n, err = fd.raio.Wait() + fd.raio = nil + if isHangup(err) { + err = io.EOF + } + if isInterrupted(err) { + err = errTimeout + } if fd.net == "udp" && err == io.EOF { n = 0 err = nil @@ -140,6 +129,9 @@ func (fd *netFD) Read(b []byte) (n int, err error) { } func (fd *netFD) Write(b []byte) (n int, err error) { + if fd.wtimedout.isSet() { + return 0, errTimeout + } if !fd.ok() || fd.data == nil { return 0, syscall.EINVAL } @@ -147,7 +139,13 @@ func (fd *netFD) Write(b []byte) (n int, err error) { return 0, err } defer fd.writeUnlock() - return fd.data.Write(b) + fd.waio = newAsyncIO(fd.data.Write, b) + n, err = fd.waio.Wait() + fd.waio = nil + if isInterrupted(err) { + err = errTimeout + } + return } func (fd *netFD) closeRead() error { @@ -165,7 +163,7 @@ func (fd *netFD) closeWrite() error { } func (fd *netFD) Close() error { - if !fd.fdmu.IncrefAndClose() { + if !fd.fdmu.increfAndClose() { return errClosing } if !fd.ok() { @@ -173,11 +171,10 @@ func (fd *netFD) Close() error { } if fd.net == "tcp" { // The following line is required to unblock Reads. - // For some reason, WriteString returns an error: - // "write /net/tcp/39/listen: inappropriate use of fd" - // But without it, Reads on dead conns hang forever. - // See Issue 9554. - fd.ctl.WriteString("hangup") + _, err := fd.ctl.WriteString("close") + if err != nil { + return err + } } err := fd.ctl.Close() if fd.data != nil { @@ -185,8 +182,14 @@ func (fd *netFD) Close() error { err = err1 } } + if fd.listen != nil { + if err1 := fd.listen.Close(); err1 != nil && err == nil { + err = err1 + } + } fd.ctl = nil fd.data = nil + fd.listen = nil return err } @@ -206,9 +209,7 @@ func (l *TCPListener) dup() (*os.File, error) { } func (fd *netFD) file(f *os.File, s string) (*os.File, error) { - syscall.ForkLock.RLock() dfd, err := syscall.Dup(int(f.Fd()), -1) - syscall.ForkLock.RUnlock() if err != nil { return nil, os.NewSyscallError("dup", err) } @@ -216,15 +217,74 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) { } func (fd *netFD) setDeadline(t time.Time) error { - return syscall.EPLAN9 + return setDeadlineImpl(fd, t, 'r'+'w') } func (fd *netFD) setReadDeadline(t time.Time) error { - return syscall.EPLAN9 + return setDeadlineImpl(fd, t, 'r') } func (fd *netFD) setWriteDeadline(t time.Time) error { - return syscall.EPLAN9 + return setDeadlineImpl(fd, t, 'w') +} + +func setDeadlineImpl(fd *netFD, t time.Time, mode int) error { + d := t.Sub(time.Now()) + if mode == 'r' || mode == 'r'+'w' { + fd.rtimedout.setFalse() + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimedout.setFalse() + } + if t.IsZero() || d < 0 { + // Stop timer + if mode == 'r' || mode == 'r'+'w' { + if fd.rtimer != nil { + fd.rtimer.Stop() + } + fd.rtimer = nil + } + if mode == 'w' || mode == 'r'+'w' { + if fd.wtimer != nil { + fd.wtimer.Stop() + } + fd.wtimer = nil + } + } else { + // Interrupt I/O operation once timer has expired + if mode == 'r' || mode == 'r'+'w' { + fd.rtimer = time.AfterFunc(d, func() { + fd.rtimedout.setTrue() + if fd.raio != nil { + fd.raio.Cancel() + } + }) + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimer = time.AfterFunc(d, func() { + fd.wtimedout.setTrue() + if fd.waio != nil { + fd.waio.Cancel() + } + }) + } + } + if !t.IsZero() && d < 0 { + // Interrupt current I/O operation + if mode == 'r' || mode == 'r'+'w' { + fd.rtimedout.setTrue() + if fd.raio != nil { + fd.raio.Cancel() + } + } + if mode == 'w' || mode == 'r'+'w' { + fd.wtimedout.setTrue() + if fd.waio != nil { + fd.waio.Cancel() + } + } + } + return nil } func setReadBuffer(fd *netFD, bytes int) error { @@ -234,3 +294,11 @@ func setReadBuffer(fd *netFD, bytes int) error { func setWriteBuffer(fd *netFD, bytes int) error { return syscall.EPLAN9 } + +func isHangup(err error) bool { + return err != nil && stringsHasSuffix(err.Error(), "Hangup") +} + +func isInterrupted(err error) bool { + return err != nil && stringsHasSuffix(err.Error(), "interrupted") +} |