summaryrefslogtreecommitdiff
path: root/libgo/go/netchan/import.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/netchan/import.go')
-rw-r--r--libgo/go/netchan/import.go51
1 files changed, 46 insertions, 5 deletions
diff --git a/libgo/go/netchan/import.go b/libgo/go/netchan/import.go
index 0a700ca2b99..ec17d97774b 100644
--- a/libgo/go/netchan/import.go
+++ b/libgo/go/netchan/import.go
@@ -11,6 +11,7 @@ import (
"os"
"reflect"
"sync"
+ "time"
)
// Import
@@ -31,6 +32,9 @@ type Importer struct {
chans map[int]*netChan
errors chan os.Error
maxId int
+ mu sync.Mutex // protects remaining fields
+ unacked int64 // number of unacknowledged sends.
+ seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
}
// NewImporter creates a new Importer object to import a set of channels
@@ -42,6 +46,7 @@ func NewImporter(conn io.ReadWriter) *Importer {
imp.chans = make(map[int]*netChan)
imp.names = make(map[string]*netChan)
imp.errors = make(chan os.Error, 10)
+ imp.unacked = 0
go imp.run()
return imp
}
@@ -80,8 +85,10 @@ func (imp *Importer) run() {
for {
*hdr = header{}
if e := imp.decode(hdrValue); e != nil {
- impLog("header:", e)
- imp.shutdown()
+ if e != os.EOF {
+ impLog("header:", e)
+ imp.shutdown()
+ }
return
}
switch hdr.PayloadType {
@@ -95,7 +102,7 @@ func (imp *Importer) run() {
if err.Error != "" {
impLog("response error:", err.Error)
select {
- case imp.errors <- os.ErrorString(err.Error):
+ case imp.errors <- os.NewError(err.Error):
continue // errors are not acknowledged
default:
imp.shutdown()
@@ -114,6 +121,9 @@ func (imp *Importer) run() {
nch := imp.getChan(hdr.Id, true)
if nch != nil {
nch.acked()
+ imp.mu.Lock()
+ imp.unacked--
+ imp.mu.Unlock()
}
continue
default:
@@ -193,7 +203,7 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size,
defer imp.chanLock.Unlock()
_, present := imp.names[name]
if present {
- return os.ErrorString("channel name already being imported:" + name)
+ return os.NewError("channel name already being imported:" + name)
}
if size < 1 {
size = 1
@@ -220,10 +230,17 @@ func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size,
}
return
}
+ // We hold the lock during transmission to guarantee messages are
+ // sent in order.
+ imp.mu.Lock()
+ imp.unacked++
+ imp.seqLock.Lock()
+ imp.mu.Unlock()
if err = imp.encode(hdr, payData, val.Interface()); err != nil {
impLog("error encoding client send:", err)
return
}
+ imp.seqLock.Unlock()
}
}()
}
@@ -237,10 +254,34 @@ func (imp *Importer) Hangup(name string) os.Error {
defer imp.chanLock.Unlock()
nc := imp.names[name]
if nc == nil {
- return os.ErrorString("netchan import: hangup: no such channel: " + name)
+ return os.NewError("netchan import: hangup: no such channel: " + name)
}
imp.names[name] = nil, false
imp.chans[nc.id] = nil, false
nc.close()
return nil
}
+
+func (imp *Importer) unackedCount() int64 {
+ imp.mu.Lock()
+ n := imp.unacked
+ imp.mu.Unlock()
+ return n
+}
+
+// Drain waits until all messages sent from this exporter/importer, including
+// those not yet sent to any server and possibly including those sent while
+// Drain was executing, have been received by the exporter. In short, it
+// waits until all the importer's messages have been received.
+// If the timeout (measured in nanoseconds) is positive and Drain takes
+// longer than that to complete, an error is returned.
+func (imp *Importer) Drain(timeout int64) os.Error {
+ startTime := time.Nanoseconds()
+ for imp.unackedCount() > 0 {
+ if timeout > 0 && time.Nanoseconds()-startTime >= timeout {
+ return os.NewError("timeout")
+ }
+ time.Sleep(100 * 1e6)
+ }
+ return nil
+}