diff options
Diffstat (limited to 'libgo/go/netchan/import.go')
-rw-r--r-- | libgo/go/netchan/import.go | 51 |
1 files changed, 46 insertions, 5 deletions
diff --git a/libgo/go/netchan/import.go b/libgo/go/netchan/import.go index 0a700ca2b99..ec17d97774b 100644 --- a/libgo/go/netchan/import.go +++ b/libgo/go/netchan/import.go @@ -11,6 +11,7 @@ import ( "os" "reflect" "sync" + "time" ) // Import @@ -31,6 +32,9 @@ type Importer struct { chans map[int]*netChan errors chan os.Error maxId int + mu sync.Mutex // protects remaining fields + unacked int64 // number of unacknowledged sends. + seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu } // NewImporter creates a new Importer object to import a set of channels @@ -42,6 +46,7 @@ func NewImporter(conn io.ReadWriter) *Importer { imp.chans = make(map[int]*netChan) imp.names = make(map[string]*netChan) imp.errors = make(chan os.Error, 10) + imp.unacked = 0 go imp.run() return imp } @@ -80,8 +85,10 @@ func (imp *Importer) run() { for { *hdr = header{} if e := imp.decode(hdrValue); e != nil { - impLog("header:", e) - imp.shutdown() + if e != os.EOF { + impLog("header:", e) + imp.shutdown() + } return } switch hdr.PayloadType { @@ -95,7 +102,7 @@ func (imp *Importer) run() { if err.Error != "" { impLog("response error:", err.Error) select { - case imp.errors <- os.ErrorString(err.Error): + case imp.errors <- os.NewError(err.Error): continue // errors are not acknowledged default: imp.shutdown() @@ -114,6 +121,9 @@ func (imp *Importer) run() { nch := imp.getChan(hdr.Id, true) if nch != nil { nch.acked() + imp.mu.Lock() + imp.unacked-- + imp.mu.Unlock() } continue default: @@ -193,7 +203,7 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, defer imp.chanLock.Unlock() _, present := imp.names[name] if present { - return os.ErrorString("channel name already being imported:" + name) + return os.NewError("channel name already being imported:" + name) } if size < 1 { size = 1 @@ -220,10 +230,17 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, } return } + // We hold the lock during transmission to guarantee messages are + // sent in order. + imp.mu.Lock() + imp.unacked++ + imp.seqLock.Lock() + imp.mu.Unlock() if err = imp.encode(hdr, payData, val.Interface()); err != nil { impLog("error encoding client send:", err) return } + imp.seqLock.Unlock() } }() } @@ -237,10 +254,34 @@ func (imp *Importer) Hangup(name string) os.Error { defer imp.chanLock.Unlock() nc := imp.names[name] if nc == nil { - return os.ErrorString("netchan import: hangup: no such channel: " + name) + return os.NewError("netchan import: hangup: no such channel: " + name) } imp.names[name] = nil, false imp.chans[nc.id] = nil, false nc.close() return nil } + +func (imp *Importer) unackedCount() int64 { + imp.mu.Lock() + n := imp.unacked + imp.mu.Unlock() + return n +} + +// Drain waits until all messages sent from this exporter/importer, including +// those not yet sent to any server and possibly including those sent while +// Drain was executing, have been received by the exporter. In short, it +// waits until all the importer's messages have been received. +// If the timeout (measured in nanoseconds) is positive and Drain takes +// longer than that to complete, an error is returned. +func (imp *Importer) Drain(timeout int64) os.Error { + startTime := time.Nanoseconds() + for imp.unackedCount() > 0 { + if timeout > 0 && time.Nanoseconds()-startTime >= timeout { + return os.NewError("timeout") + } + time.Sleep(100 * 1e6) + } + return nil +} |